diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 4fa47dc403cc9..c63e874c1bc15 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -21,41 +21,33 @@ use crate::{ chain_head::{ api::ChainHeadApiServer, + chain_head_follow::ChainHeadFollower, error::Error as ChainHeadRpcError, - event::{ - BestBlockChanged, ChainHeadEvent, ChainHeadResult, ErrorEvent, Finalized, FollowEvent, - Initialized, NetworkConfig, NewBlock, RuntimeEvent, RuntimeVersionEvent, - }, - subscription::{SubscriptionHandle, SubscriptionManagement, SubscriptionManagementError}, + event::{ChainHeadEvent, ChainHeadResult, ErrorEvent, FollowEvent, NetworkConfig}, + subscription::SubscriptionManagement, }, SubscriptionTaskExecutor, }; use codec::Encode; -use futures::{ - channel::oneshot, - future::FutureExt, - stream::{self, Stream, StreamExt}, -}; -use futures_util::future::Either; +use futures::future::FutureExt; use jsonrpsee::{ core::{async_trait, RpcResult}, types::{SubscriptionEmptyError, SubscriptionId, SubscriptionResult}, SubscriptionSink, }; -use log::{debug, error}; +use log::debug; use sc_client_api::{ - Backend, BlockBackend, BlockImportNotification, BlockchainEvents, CallExecutor, ChildInfo, - ExecutorProvider, FinalityNotification, StorageKey, StorageProvider, + Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey, + StorageProvider, }; -use serde::Serialize; use sp_api::CallApiAt; -use sp_blockchain::{ - Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, -}; +use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata}; use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, traits::CallContext, Bytes}; -use sp_runtime::traits::{Block as BlockT, Header}; +use sp_runtime::traits::Block as BlockT; use std::{marker::PhantomData, sync::Arc}; +pub(crate) const LOG_TARGET: &str = "rpc-spec-v2"; + /// An API for chain head RPC calls. pub struct ChainHead { /// Substrate client. @@ -119,64 +111,6 @@ impl ChainHead { } } -/// Generate the initial events reported by the RPC `follow` method. -/// -/// This includes the "Initialized" event followed by the in-memory -/// blocks via "NewBlock" and the "BestBlockChanged". -fn generate_initial_events( - client: &Arc, - backend: &Arc, - handle: &SubscriptionHandle, - runtime_updates: bool, -) -> Result>, SubscriptionManagementError> -where - Block: BlockT + 'static, - Block::Header: Unpin, - BE: Backend + 'static, - Client: HeaderBackend + CallApiAt + 'static, -{ - // The initialized event is the first one sent. - let finalized_block_hash = client.info().finalized_hash; - handle.pin_block(finalized_block_hash)?; - - let finalized_block_runtime = - generate_runtime_event(&client, runtime_updates, finalized_block_hash, None); - - let initialized_event = FollowEvent::Initialized(Initialized { - finalized_block_hash, - finalized_block_runtime, - runtime_updates, - }); - - let initial_blocks = get_initial_blocks(&backend, finalized_block_hash); - let mut in_memory_blocks = Vec::with_capacity(initial_blocks.len() + 1); - - in_memory_blocks.push(initialized_event); - for (child, parent) in initial_blocks.into_iter() { - handle.pin_block(child)?; - - let new_runtime = generate_runtime_event(&client, runtime_updates, child, Some(parent)); - - let event = FollowEvent::NewBlock(NewBlock { - block_hash: child, - parent_block_hash: parent, - new_runtime, - runtime_updates, - }); - - in_memory_blocks.push(event); - } - - // Generate a new best block event. - let best_block_hash = client.info().best_hash; - if best_block_hash != finalized_block_hash { - let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash }); - in_memory_blocks.push(best_block); - }; - - Ok(in_memory_blocks) -} - /// Parse hex-encoded string parameter as raw bytes. /// /// If the parsing fails, the subscription is rejected. @@ -198,243 +132,6 @@ fn parse_hex_param( } } -/// Conditionally generate the runtime event of the given block. -fn generate_runtime_event( - client: &Arc, - runtime_updates: bool, - block: Block::Hash, - parent: Option, -) -> Option -where - Block: BlockT + 'static, - Client: CallApiAt + 'static, -{ - // No runtime versions should be reported. - if !runtime_updates { - return None - } - - let block_rt = match client.runtime_version_at(block) { - Ok(rt) => rt, - Err(err) => return Some(err.into()), - }; - - let parent = match parent { - Some(parent) => parent, - // Nothing to compare against, always report. - None => return Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt })), - }; - - let parent_rt = match client.runtime_version_at(parent) { - Ok(rt) => rt, - Err(err) => return Some(err.into()), - }; - - // Report the runtime version change. - if block_rt != parent_rt { - Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt })) - } else { - None - } -} - -/// Get the in-memory blocks of the client, starting from the provided finalized hash. -/// -/// Returns a tuple of block hash with parent hash. -fn get_initial_blocks( - backend: &Arc, - parent_hash: Block::Hash, -) -> Vec<(Block::Hash, Block::Hash)> -where - Block: BlockT + 'static, - BE: Backend + 'static, -{ - let mut result = Vec::new(); - let mut next_hash = Vec::new(); - next_hash.push(parent_hash); - - while let Some(parent_hash) = next_hash.pop() { - let Ok(blocks) = backend.blockchain().children(parent_hash) else { - continue - }; - - for child_hash in blocks { - result.push((child_hash, parent_hash)); - next_hash.push(child_hash); - } - } - - result -} - -/// Submit the events from the provided stream to the RPC client -/// for as long as the `rx_stop` event was not called. -async fn submit_events( - sink: &mut SubscriptionSink, - mut stream: EventStream, - rx_stop: oneshot::Receiver<()>, -) where - EventStream: Stream + Unpin, - T: Serialize, -{ - let mut stream_item = stream.next(); - let mut stop_event = rx_stop; - - while let Either::Left((Some(event), next_stop_event)) = - futures_util::future::select(stream_item, stop_event).await - { - match sink.send(&event) { - Ok(true) => { - stream_item = stream.next(); - stop_event = next_stop_event; - }, - // Client disconnected. - Ok(false) => return, - Err(_) => { - // Failed to submit event. - break - }, - } - } - - let _ = sink.send(&FollowEvent::::Stop); -} - -/// Generate the "NewBlock" event and potentially the "BestBlockChanged" event for -/// every notification. -fn handle_import_blocks( - client: &Arc, - handle: &SubscriptionHandle, - runtime_updates: bool, - notification: BlockImportNotification, -) -> Result<(FollowEvent, Option>), SubscriptionManagementError> -where - Block: BlockT + 'static, - Client: CallApiAt + 'static, -{ - handle.pin_block(notification.hash)?; - - let new_runtime = generate_runtime_event( - &client, - runtime_updates, - notification.hash, - Some(*notification.header.parent_hash()), - ); - - // Note: `Block::Hash` will serialize to hexadecimal encoded string. - let new_block = FollowEvent::NewBlock(NewBlock { - block_hash: notification.hash, - parent_block_hash: *notification.header.parent_hash(), - new_runtime, - runtime_updates, - }); - - if !notification.is_new_best { - return Ok((new_block, None)) - } - - // If this is the new best block, then we need to generate two events. - let best_block_event = - FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: notification.hash }); - - let mut best_block_cache = handle.best_block_write(); - match *best_block_cache { - Some(block_cache) => { - // The RPC layer has not reported this block as best before. - // Note: This handles the race with the finalized branch. - if block_cache != notification.hash { - *best_block_cache = Some(notification.hash); - Ok((new_block, Some(best_block_event))) - } else { - Ok((new_block, None)) - } - }, - None => { - *best_block_cache = Some(notification.hash); - Ok((new_block, Some(best_block_event))) - }, - } -} - -/// Generate the "Finalized" event and potentially the "BestBlockChanged" for -/// every notification. -fn handle_finalized_blocks( - client: &Arc, - handle: &SubscriptionHandle, - notification: FinalityNotification, -) -> Result<(FollowEvent, Option>), SubscriptionManagementError> -where - Block: BlockT + 'static, - Client: HeaderBackend + HeaderMetadata + 'static, -{ - let last_finalized = notification.hash; - // We might not receive all new blocks reports, also pin the block here. - handle.pin_block(last_finalized)?; - - // The tree route contains the exclusive path from the last finalized block - // to the block reported by the notification. Ensure the finalized block is - // properly reported to that path. - let mut finalized_block_hashes = notification.tree_route.iter().cloned().collect::>(); - finalized_block_hashes.push(last_finalized); - - let pruned_block_hashes: Vec<_> = notification.stale_heads.iter().cloned().collect(); - - let finalized_event = FollowEvent::Finalized(Finalized { - finalized_block_hashes, - pruned_block_hashes: pruned_block_hashes.clone(), - }); - - let mut best_block_cache = handle.best_block_write(); - match *best_block_cache { - Some(block_cache) => { - // Check if the current best block is also reported as pruned. - let reported_pruned = pruned_block_hashes.iter().find(|&&hash| hash == block_cache); - if reported_pruned.is_none() { - return Ok((finalized_event, None)) - } - - // The best block is reported as pruned. Therefore, we need to signal a new - // best block event before submitting the finalized event. - let best_block_hash = client.info().best_hash; - if best_block_hash == block_cache { - // The client doest not have any new information about the best block. - // The information from `.info()` is updated from the DB as the last - // step of the finalization and it should be up to date. Also, the - // displaced nodes (list of nodes reported) should be reported with - // an offset of 32 blocks for substrate. - // If the info is outdated, there is nothing the RPC can do for now. - error!(target: "rpc-spec-v2", "Client does not contain different best block"); - Ok((finalized_event, None)) - } else { - let ancestor = sp_blockchain::lowest_common_ancestor( - &**client, - last_finalized, - best_block_hash, - ) - .map_err(|_| { - SubscriptionManagementError::Custom("Could not find common ancestor".into()) - })?; - - // The client's best block must be a descendent of the last finalized block. - // In other words, the lowest common ancestor must be the last finalized block. - if ancestor.hash != last_finalized { - return Err(SubscriptionManagementError::Custom( - "The finalized block is not an ancestor of the best block".into(), - )) - } - - // The RPC needs to also submit a new best block changed before the - // finalized event. - *best_block_cache = Some(best_block_hash); - let best_block_event = - FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash }); - Ok((finalized_event, Some(best_block_event))) - } - }, - None => Ok((finalized_event, None)), - } -} - #[async_trait] impl ChainHeadApiServer for ChainHead where @@ -467,71 +164,28 @@ where let Some((rx_stop, sub_handle)) = self.subscriptions.insert_subscription(sub_id.clone(), runtime_updates, self.max_pinned_blocks) else { // Inserting the subscription can only fail if the JsonRPSee // generated a duplicate subscription ID. - debug!(target: "rpc-spec-v2", "[follow][id={:?}] Subscription already accepted", sub_id); + debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id); let _ = sink.send(&FollowEvent::::Stop); return Ok(()) }; - debug!(target: "rpc-spec-v2", "[follow][id={:?}] Subscription accepted", sub_id); + debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id); - let client = self.client.clone(); - let handle = sub_handle.clone(); - let subscription_id = sub_id.clone(); - - let stream_import = self - .client - .import_notification_stream() - .map(move |notification| { - match handle_import_blocks(&client, &handle, runtime_updates, notification) { - Ok((new_block, None)) => stream::iter(vec![new_block]), - Ok((new_block, Some(best_block))) => stream::iter(vec![new_block, best_block]), - Err(_) => { - debug!(target: "rpc-spec-v2", "[follow][id={:?}] Failed to handle block import notification.", subscription_id); - handle.stop(); - stream::iter(vec![]) - }, - } - }) - .flatten(); - - let client = self.client.clone(); - let handle = sub_handle.clone(); - let subscription_id = sub_id.clone(); - - let stream_finalized = self - .client - .finality_notification_stream() - .map(move |notification| { - match handle_finalized_blocks(&client, &handle, notification) { - Ok((finalized_event, None)) => stream::iter(vec![finalized_event]), - Ok((finalized_event, Some(best_block))) => - stream::iter(vec![best_block, finalized_event]), - Err(_) => { - debug!(target: "rpc-spec-v2", "[follow][id={:?}] Failed to import finalized blocks", subscription_id); - handle.stop(); - stream::iter(vec![]) - }, - } - }) - .flatten(); - - let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized); let subscriptions = self.subscriptions.clone(); - let client = self.client.clone(); let backend = self.backend.clone(); + let client = self.client.clone(); let fut = async move { - let Ok(initial_events) = generate_initial_events(&client, &backend, &sub_handle, runtime_updates) else { - // Stop the subscription if we exceeded the maximum number of blocks pinned. - debug!(target: "rpc-spec-v2", "[follow][id={:?}] Exceeded max pinned blocks from initial events", sub_id); - let _ = sink.send(&FollowEvent::::Stop); - return - }; + let mut chain_head_follow = ChainHeadFollower::new( + client, + backend, + sub_handle, + runtime_updates, + sub_id.clone(), + ); - let stream = stream::iter(initial_events).chain(merged); + chain_head_follow.generate_events(sink, rx_stop).await; - submit_events(&mut sink, stream.boxed(), rx_stop).await; - // The client disconnected or called the unsubscribe method. subscriptions.remove_subscription(&sub_id); - debug!(target: "rpc-spec-v2", "[follow][id={:?}] Subscription removed", sub_id); + debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription removed", sub_id); }; self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed()); @@ -569,7 +223,12 @@ where }, Ok(None) => { // The block's body was pruned. This subscription ID has become invalid. - debug!(target: "rpc-spec-v2", "[body][id={:?}] Stopping subscription because hash={:?} was pruned", follow_subscription, hash); + debug!( + target: LOG_TARGET, + "[body][id={:?}] Stopping subscription because hash={:?} was pruned", + follow_subscription, + hash + ); handle.stop(); ChainHeadEvent::::Disjoint }, diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs new file mode 100644 index 0000000000000..9173b7340b7e5 --- /dev/null +++ b/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -0,0 +1,644 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Implementation of the `chainHead_follow` method. + +use crate::chain_head::{ + chain_head::LOG_TARGET, + event::{ + BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock, RuntimeEvent, + RuntimeVersionEvent, + }, + subscription::{SubscriptionHandle, SubscriptionManagementError}, +}; +use futures::{ + channel::oneshot, + stream::{self, Stream, StreamExt}, +}; +use futures_util::future::Either; +use jsonrpsee::SubscriptionSink; +use log::{debug, error}; +use sc_client_api::{ + Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification, +}; +use sp_api::CallApiAt; +use sp_blockchain::{ + Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata, Info, +}; +use sp_runtime::{ + traits::{Block as BlockT, Header as HeaderT, One}, + Saturating, +}; +use std::{collections::HashSet, sync::Arc}; + +/// Generates the events of the `chainHead_follow` method. +pub struct ChainHeadFollower { + /// Substrate client. + client: Arc, + /// Backend of the chain. + backend: Arc, + /// Subscription handle. + sub_handle: SubscriptionHandle, + /// Subscription was started with the runtime updates flag. + runtime_updates: bool, + /// Subscription ID. + sub_id: String, + /// The best reported block by this subscription. + best_block_cache: Option, +} + +impl ChainHeadFollower { + /// Create a new [`ChainHeadFollower`]. + pub fn new( + client: Arc, + backend: Arc, + sub_handle: SubscriptionHandle, + runtime_updates: bool, + sub_id: String, + ) -> Self { + Self { client, backend, sub_handle, runtime_updates, sub_id, best_block_cache: None } + } +} + +/// A block notification. +enum NotificationType { + /// The initial events generated from the node's memory. + InitialEvents(Vec>), + /// The new block notification obtained from `import_notification_stream`. + NewBlock(BlockImportNotification), + /// The finalized block notification obtained from `finality_notification_stream`. + Finalized(FinalityNotification), +} + +/// The initial blocks that should be reported or ignored by the chainHead. +#[derive(Clone, Debug)] +struct InitialBlocks { + /// Children of the latest finalized block, for which the `NewBlock` + /// event must be generated. + /// + /// It is a tuple of (block hash, parent hash). + finalized_block_descendants: Vec<(Block::Hash, Block::Hash)>, + /// Blocks that should not be reported as pruned by the `Finalized` event. + /// + /// Substrate database will perform the pruning of height N at + /// the finalization N + 1. We could have the following block tree + /// when the user subscribes to the `follow` method: + /// [A] - [A1] - [A2] - [A3] + /// ^^ finalized + /// - [A1] - [B1] + /// + /// When the A3 block is finalized, B1 is reported as pruned, however + /// B1 was never reported as `NewBlock` (and as such was never pinned). + /// This is because the `NewBlock` events are generated for children of + /// the finalized hash. + pruned_forks: HashSet, +} + +/// The startup point from which chainHead started to generate events. +struct StartupPoint { + /// Best block hash. + pub best_hash: Block::Hash, + /// The head of the finalized chain. + pub finalized_hash: Block::Hash, + /// Last finalized block number. + pub finalized_number: <::Header as HeaderT>::Number, +} + +impl From> for StartupPoint { + fn from(info: Info) -> Self { + StartupPoint:: { + best_hash: info.best_hash, + finalized_hash: info.finalized_hash, + finalized_number: info.finalized_number, + } + } +} + +impl ChainHeadFollower +where + Block: BlockT + 'static, + BE: Backend + 'static, + Client: BlockBackend + + HeaderBackend + + HeaderMetadata + + BlockchainEvents + + CallApiAt + + 'static, +{ + /// Conditionally generate the runtime event of the given block. + fn generate_runtime_event( + &self, + block: Block::Hash, + parent: Option, + ) -> Option { + // No runtime versions should be reported. + if !self.runtime_updates { + return None + } + + let block_rt = match self.client.runtime_version_at(block) { + Ok(rt) => rt, + Err(err) => return Some(err.into()), + }; + + let parent = match parent { + Some(parent) => parent, + // Nothing to compare against, always report. + None => return Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt })), + }; + + let parent_rt = match self.client.runtime_version_at(parent) { + Ok(rt) => rt, + Err(err) => return Some(err.into()), + }; + + // Report the runtime version change. + if block_rt != parent_rt { + Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec: block_rt })) + } else { + None + } + } + + /// Get the in-memory blocks of the client, starting from the provided finalized hash. + fn get_init_blocks_with_forks( + &self, + startup_point: &StartupPoint, + ) -> Result, SubscriptionManagementError> { + let blockchain = self.backend.blockchain(); + let leaves = blockchain.leaves()?; + let finalized = startup_point.finalized_hash; + let mut pruned_forks = HashSet::new(); + let mut finalized_block_descendants = Vec::new(); + let mut unique_descendants = HashSet::new(); + for leaf in leaves { + let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)?; + + let blocks = tree_route.enacted().iter().map(|block| block.hash); + if !tree_route.retracted().is_empty() { + pruned_forks.extend(blocks); + } else { + // Ensure a `NewBlock` event is generated for all children of the + // finalized block. Describe the tree route as (child_node, parent_node) + // Note: the order of elements matters here. + let parents = std::iter::once(finalized).chain(blocks.clone()); + + for pair in blocks.zip(parents) { + if unique_descendants.insert(pair) { + finalized_block_descendants.push(pair); + } + } + } + } + + Ok(InitialBlocks { finalized_block_descendants, pruned_forks }) + } + + /// Generate the initial events reported by the RPC `follow` method. + /// + /// Returns the initial events that should be reported directly, together with pruned + /// block hashes that should be ignored by the `Finalized` event. + fn generate_init_events( + &mut self, + startup_point: &StartupPoint, + ) -> Result<(Vec>, HashSet), SubscriptionManagementError> + { + let init = self.get_init_blocks_with_forks(startup_point)?; + + let initial_blocks = init.finalized_block_descendants; + + // The initialized event is the first one sent. + let finalized_block_hash = startup_point.finalized_hash; + self.sub_handle.pin_block(finalized_block_hash)?; + + let finalized_block_runtime = self.generate_runtime_event(finalized_block_hash, None); + + let initialized_event = FollowEvent::Initialized(Initialized { + finalized_block_hash, + finalized_block_runtime, + runtime_updates: self.runtime_updates, + }); + + let mut finalized_block_descendants = Vec::with_capacity(initial_blocks.len() + 1); + + finalized_block_descendants.push(initialized_event); + for (child, parent) in initial_blocks.into_iter() { + self.sub_handle.pin_block(child)?; + + let new_runtime = self.generate_runtime_event(child, Some(parent)); + + let event = FollowEvent::NewBlock(NewBlock { + block_hash: child, + parent_block_hash: parent, + new_runtime, + runtime_updates: self.runtime_updates, + }); + + finalized_block_descendants.push(event); + } + + // Generate a new best block event. + let best_block_hash = startup_point.best_hash; + if best_block_hash != finalized_block_hash { + let best_block = FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash }); + self.best_block_cache = Some(best_block_hash); + finalized_block_descendants.push(best_block); + }; + + Ok((finalized_block_descendants, init.pruned_forks)) + } + + /// Generate the "NewBlock" event and potentially the "BestBlockChanged" event for the + /// given block hash. + fn generate_import_events( + &mut self, + block_hash: Block::Hash, + parent_block_hash: Block::Hash, + is_best_block: bool, + ) -> Vec> { + let new_runtime = self.generate_runtime_event(block_hash, Some(parent_block_hash)); + + let new_block = FollowEvent::NewBlock(NewBlock { + block_hash, + parent_block_hash, + new_runtime, + runtime_updates: self.runtime_updates, + }); + + if !is_best_block { + return vec![new_block] + } + + // If this is the new best block, then we need to generate two events. + let best_block_event = + FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash: block_hash }); + + match self.best_block_cache { + Some(block_cache) => { + // The RPC layer has not reported this block as best before. + // Note: This handles the race with the finalized branch. + if block_cache != block_hash { + self.best_block_cache = Some(block_hash); + vec![new_block, best_block_event] + } else { + vec![new_block] + } + }, + None => { + self.best_block_cache = Some(block_hash); + vec![new_block, best_block_event] + }, + } + } + + /// Handle the import of new blocks by generating the appropriate events. + fn handle_import_blocks( + &mut self, + notification: BlockImportNotification, + startup_point: &StartupPoint, + ) -> Result>, SubscriptionManagementError> { + // The block was already pinned by the initial block events or by the finalized event. + if !self.sub_handle.pin_block(notification.hash)? { + return Ok(Default::default()) + } + + // Ensure we are only reporting blocks after the starting point. + let Some(block_number) = self.client.number(notification.hash)? else { + return Err(SubscriptionManagementError::BlockNumberAbsent) + }; + if block_number < startup_point.finalized_number { + return Ok(Default::default()) + } + + Ok(self.generate_import_events( + notification.hash, + *notification.header.parent_hash(), + notification.is_new_best, + )) + } + + /// Generates new block events from the given finalized hashes. + /// + /// It may be possible that the `Finalized` event fired before the `NewBlock` + /// event. In that case, for each finalized hash that was not reported yet + /// generate the `NewBlock` event. For the final finalized hash we must also + /// generate one `BestBlock` event. + fn generate_finalized_events( + &mut self, + finalized_block_hashes: &[Block::Hash], + ) -> Result>, SubscriptionManagementError> { + let mut events = Vec::new(); + + // Nothing to be done if no finalized hashes are provided. + let Some(first_hash) = finalized_block_hashes.get(0) else { + return Ok(Default::default()) + }; + + // Find the parent hash. + let Some(first_number) = self.client.number(*first_hash)? else { + return Err(SubscriptionManagementError::BlockNumberAbsent) + }; + let Some(parent) = self.client.hash(first_number.saturating_sub(One::one()))? else { + return Err(SubscriptionManagementError::BlockHashAbsent) + }; + + let last_finalized = finalized_block_hashes + .last() + .expect("At least one finalized hash inserted; qed"); + let parents = std::iter::once(&parent).chain(finalized_block_hashes.iter()); + for (hash, parent) in finalized_block_hashes.iter().zip(parents) { + // This block is already reported by the import notification. + if !self.sub_handle.pin_block(*hash)? { + continue + } + + // Generate only the `NewBlock` event for this block. + if hash != last_finalized { + events.extend(self.generate_import_events(*hash, *parent, false)); + continue + } + + match self.best_block_cache { + Some(best_block_hash) => { + // If the best reported block is a children of the last finalized, + // then we had a gap in notification. + let ancestor = sp_blockchain::lowest_common_ancestor( + &*self.client, + *last_finalized, + best_block_hash, + )?; + + // A descendent of the finalized block was already reported + // before the `NewBlock` event containing the finalized block + // is reported. + if ancestor.hash == *last_finalized { + return Err(SubscriptionManagementError::Custom( + "A descendent of the finalized block was already reported".into(), + )) + } + self.best_block_cache = Some(*hash); + }, + // This is the first best block event that we generate. + None => { + self.best_block_cache = Some(*hash); + }, + }; + + // This is the first time we see this block. Generate the `NewBlock` event; if this is + // the last block, also generate the `BestBlock` event. + events.extend(self.generate_import_events(*hash, *parent, true)) + } + + Ok(events) + } + + /// Get all pruned block hashes from the provided stale heads. + /// + /// The result does not include hashes from `to_ignore`. + fn get_pruned_hashes( + &self, + stale_heads: &[Block::Hash], + last_finalized: Block::Hash, + to_ignore: &mut HashSet, + ) -> Result, SubscriptionManagementError> { + let blockchain = self.backend.blockchain(); + let mut pruned = Vec::new(); + + for stale_head in stale_heads { + let tree_route = sp_blockchain::tree_route(blockchain, last_finalized, *stale_head)?; + + // Collect only blocks that are not part of the canonical chain. + pruned.extend(tree_route.enacted().iter().filter_map(|block| { + if !to_ignore.remove(&block.hash) { + Some(block.hash) + } else { + None + } + })) + } + + Ok(pruned) + } + + /// Handle the finalization notification by generating the `Finalized` event. + /// + /// If the block of the notification was not reported yet, this method also + /// generates the events similar to `handle_import_blocks`. + fn handle_finalized_blocks( + &mut self, + notification: FinalityNotification, + to_ignore: &mut HashSet, + startup_point: &StartupPoint, + ) -> Result>, SubscriptionManagementError> { + let last_finalized = notification.hash; + + // Ensure we are only reporting blocks after the starting point. + let Some(block_number) = self.client.number(last_finalized)? else { + return Err(SubscriptionManagementError::BlockNumberAbsent) + }; + if block_number < startup_point.finalized_number { + return Ok(Default::default()) + } + + // The tree route contains the exclusive path from the last finalized block to the block + // reported by the notification. Ensure the finalized block is also reported. + let mut finalized_block_hashes = + notification.tree_route.iter().cloned().collect::>(); + finalized_block_hashes.push(last_finalized); + + // If the finalized hashes were not reported yet, generate the `NewBlock` events. + let mut events = self.generate_finalized_events(&finalized_block_hashes)?; + + // Report all pruned blocks from the notification that are not + // part of the fork we need to ignore. + let pruned_block_hashes = + self.get_pruned_hashes(¬ification.stale_heads, last_finalized, to_ignore)?; + + let finalized_event = FollowEvent::Finalized(Finalized { + finalized_block_hashes, + pruned_block_hashes: pruned_block_hashes.clone(), + }); + + match self.best_block_cache { + Some(block_cache) => { + // Check if the current best block is also reported as pruned. + let reported_pruned = pruned_block_hashes.iter().find(|&&hash| hash == block_cache); + if reported_pruned.is_none() { + events.push(finalized_event); + return Ok(events) + } + + // The best block is reported as pruned. Therefore, we need to signal a new + // best block event before submitting the finalized event. + let best_block_hash = self.client.info().best_hash; + if best_block_hash == block_cache { + // The client doest not have any new information about the best block. + // The information from `.info()` is updated from the DB as the last + // step of the finalization and it should be up to date. + // If the info is outdated, there is nothing the RPC can do for now. + error!( + target: LOG_TARGET, + "[follow][id={:?}] Client does not contain different best block", + self.sub_id, + ); + events.push(finalized_event); + Ok(events) + } else { + let ancestor = sp_blockchain::lowest_common_ancestor( + &*self.client, + last_finalized, + best_block_hash, + )?; + + // The client's best block must be a descendent of the last finalized block. + // In other words, the lowest common ancestor must be the last finalized block. + if ancestor.hash != last_finalized { + return Err(SubscriptionManagementError::Custom( + "The finalized block is not an ancestor of the best block".into(), + )) + } + + // The RPC needs to also submit a new best block changed before the + // finalized event. + self.best_block_cache = Some(best_block_hash); + let best_block_event = + FollowEvent::BestBlockChanged(BestBlockChanged { best_block_hash }); + events.extend([best_block_event, finalized_event]); + Ok(events) + } + }, + None => { + events.push(finalized_event); + Ok(events) + }, + } + } + + /// Submit the events from the provided stream to the RPC client + /// for as long as the `rx_stop` event was not called. + async fn submit_events( + &mut self, + startup_point: &StartupPoint, + mut stream: EventStream, + mut to_ignore: HashSet, + mut sink: SubscriptionSink, + rx_stop: oneshot::Receiver<()>, + ) where + EventStream: Stream> + Unpin, + { + let mut stream_item = stream.next(); + let mut stop_event = rx_stop; + + while let Either::Left((Some(event), next_stop_event)) = + futures_util::future::select(stream_item, stop_event).await + { + let events = match event { + NotificationType::InitialEvents(events) => Ok(events), + NotificationType::NewBlock(notification) => + self.handle_import_blocks(notification, &startup_point), + NotificationType::Finalized(notification) => + self.handle_finalized_blocks(notification, &mut to_ignore, &startup_point), + }; + + let events = match events { + Ok(events) => events, + Err(err) => { + debug!( + target: LOG_TARGET, + "[follow][id={:?}] Failed to handle stream notification {:?}", + self.sub_id, + err + ); + let _ = sink.send(&FollowEvent::::Stop); + return + }, + }; + + for event in events { + let result = sink.send(&event); + + // Migration note: the new version of jsonrpsee returns Result<(), DisconnectError> + // The logic from `Err(err)` should be moved when building the new + // `SubscriptionMessage`. + + // For now, jsonrpsee returns: + // Ok(true): message sent + // Ok(false): client disconnected or subscription closed + // Err(err): serder serialization error of the event + if let Err(err) = result { + // Failed to submit event. + debug!( + target: LOG_TARGET, + "[follow][id={:?}] Failed to send event {:?}", self.sub_id, err + ); + + let _ = sink.send(&FollowEvent::::Stop); + return + } + + if let Ok(false) = result { + // Client disconnected or subscription was closed. + return + } + } + + stream_item = stream.next(); + stop_event = next_stop_event; + } + } + + /// Generate the block events for the `chainHead_follow` method. + pub async fn generate_events( + &mut self, + mut sink: SubscriptionSink, + rx_stop: oneshot::Receiver<()>, + ) { + // Register for the new block and finalized notifications. + let stream_import = self + .client + .import_notification_stream() + .map(|notification| NotificationType::NewBlock(notification)); + + let stream_finalized = self + .client + .finality_notification_stream() + .map(|notification| NotificationType::Finalized(notification)); + + let startup_point = StartupPoint::from(self.client.info()); + let (initial_events, pruned_forks) = match self.generate_init_events(&startup_point) { + Ok(blocks) => blocks, + Err(err) => { + debug!( + target: LOG_TARGET, + "[follow][id={:?}] Failed to generate the initial events {:?}", + self.sub_id, + err + ); + let _ = sink.send(&FollowEvent::::Stop); + return + }, + }; + + let initial = NotificationType::InitialEvents(initial_events); + let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized); + let stream = stream::once(futures::future::ready(initial)).chain(merged); + + self.submit_events(&startup_point, stream.boxed(), pruned_forks, sink, rx_stop) + .await; + } +} diff --git a/client/rpc-spec-v2/src/chain_head/mod.rs b/client/rpc-spec-v2/src/chain_head/mod.rs index 3d6d5a916aa75..afa8d3b2189ae 100644 --- a/client/rpc-spec-v2/src/chain_head/mod.rs +++ b/client/rpc-spec-v2/src/chain_head/mod.rs @@ -30,6 +30,7 @@ pub mod chain_head; pub mod error; pub mod event; +mod chain_head_follow; mod subscription; pub use api::ChainHeadApiServer; diff --git a/client/rpc-spec-v2/src/chain_head/subscription.rs b/client/rpc-spec-v2/src/chain_head/subscription.rs index e52336097086d..77d57e747ebc1 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription.rs @@ -19,7 +19,8 @@ //! Subscription management for tracking subscription IDs to pinned blocks. use futures::channel::oneshot; -use parking_lot::{RwLock, RwLockWriteGuard}; +use parking_lot::RwLock; +use sp_blockchain::Error; use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, @@ -33,10 +34,22 @@ pub enum SubscriptionManagementError { /// the subscription has exceeded the maximum number /// of blocks pinned. ExceededLimits, + /// Error originated from the blockchain (client or backend). + Blockchain(Error), + /// The database does not contain a block number. + BlockNumberAbsent, + /// The database does not contain a block hash. + BlockHashAbsent, /// Custom error. Custom(String), } +impl From for SubscriptionManagementError { + fn from(err: Error) -> Self { + SubscriptionManagementError::Blockchain(err) + } +} + /// Inner subscription data structure. struct SubscriptionInner { /// The `runtime_updates` parameter flag of the subscription. @@ -53,10 +66,6 @@ struct SubscriptionInner { #[derive(Clone)] pub struct SubscriptionHandle { inner: Arc>>, - /// The best reported block by this subscription. - /// Have this as a separate variable to easily share - /// the write guard with the RPC layer. - best_block: Arc>>, } impl SubscriptionHandle { @@ -69,7 +78,6 @@ impl SubscriptionHandle { blocks: HashSet::new(), max_pinned_blocks, })), - best_block: Arc::new(RwLock::new(None)), } } @@ -125,11 +133,6 @@ impl SubscriptionHandle { let inner = self.inner.read(); inner.runtime_updates } - - /// Get the write guard of the best reported block. - pub fn best_block_write(&self) -> RwLockWriteGuard<'_, Option> { - self.best_block.write() - } } /// Manage block pinning / unpinning for subscription IDs. diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 79be391368f71..0886efa94a756 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -1015,4 +1015,305 @@ async fn follow_prune_best_block() { pruned_block_hashes: vec![format!("{:?}", block_2_hash)], }); assert_eq!(event, expected); + + // Pruned hash can be unpinned. + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + let hash = format!("{:?}", block_2_hash); + let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &hash]).await.unwrap(); +} + +#[tokio::test] +async fn follow_forks_pruned_block() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + MAX_PINNED_BLOCKS, + ) + .into_rpc(); + + // Block tree before the subscription: + // + // finalized -> block 1 -> block 2 -> block 3 + // ^^^ finalized + // -> block 1 -> block 4 -> block 5 + // + + let block_1 = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block_1.clone()).await.unwrap(); + + let block_2 = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block_2.clone()).await.unwrap(); + + let block_3 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_3_hash = block_3.header.hash(); + client.import(BlockOrigin::Own, block_3.clone()).await.unwrap(); + + // Block 4 with parent Block 1 is not the best imported. + let mut block_builder = + client.new_block_at(block_1.header.hash(), Default::default(), false).unwrap(); + // This push is required as otherwise block 4 has the same hash as block 2 and won't get + // imported + block_builder + .push_transfer(Transfer { + from: AccountKeyring::Alice.into(), + to: AccountKeyring::Ferdie.into(), + amount: 41, + nonce: 0, + }) + .unwrap(); + let block_4 = block_builder.build().unwrap().block; + client.import(BlockOrigin::Own, block_4.clone()).await.unwrap(); + + let mut block_builder = + client.new_block_at(block_4.header.hash(), Default::default(), false).unwrap(); + block_builder + .push_transfer(Transfer { + from: AccountKeyring::Bob.into(), + to: AccountKeyring::Ferdie.into(), + amount: 41, + nonce: 0, + }) + .unwrap(); + let block_5 = block_builder.build().unwrap().block; + client.import(BlockOrigin::Own, block_5.clone()).await.unwrap(); + + // Block 4 and 5 are not pruned, pruning happens at height (N - 1). + client.finalize_block(block_3_hash, None).unwrap(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + + // Initialized must always be reported first. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Initialized(Initialized { + finalized_block_hash: format!("{:?}", block_3_hash), + finalized_block_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + // Block tree: + // + // finalized -> block 1 -> block 2 -> block 3 -> block 6 + // ^^^ finalized + // -> block 1 -> block 4 -> block 5 + // + // Mark block 6 as finalized to force block 4 and 5 to get pruned. + + let block_6 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_6_hash = block_6.header.hash(); + client.import(BlockOrigin::Own, block_6.clone()).await.unwrap(); + + client.finalize_block(block_6_hash, None).unwrap(); + + // Check block 6. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_6_hash), + parent_block_hash: format!("{:?}", block_3_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_6_hash), + }); + assert_eq!(event, expected); + + // Block 4 and 5 must not be reported as pruned. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Finalized(Finalized { + finalized_block_hashes: vec![format!("{:?}", block_6_hash)], + pruned_block_hashes: vec![], + }); + assert_eq!(event, expected); +} + +#[tokio::test] +async fn follow_report_multiple_pruned_block() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + MAX_PINNED_BLOCKS, + ) + .into_rpc(); + + // Block tree: + // + // finalized -> block 1 -> block 2 -> block 3 + // ^^^ finalized after subscription + // -> block 1 -> block 4 -> block 5 + + let finalized_hash = client.info().finalized_hash; + + let block_1 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_1_hash = block_1.header.hash(); + client.import(BlockOrigin::Own, block_1.clone()).await.unwrap(); + + let block_2 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_2_hash = block_2.header.hash(); + client.import(BlockOrigin::Own, block_2.clone()).await.unwrap(); + + let block_3 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_3_hash = block_3.header.hash(); + client.import(BlockOrigin::Own, block_3.clone()).await.unwrap(); + + // Block 4 with parent Block 1 is not the best imported. + let mut block_builder = + client.new_block_at(block_1.header.hash(), Default::default(), false).unwrap(); + // This push is required as otherwise block 4 has the same hash as block 2 and won't get + // imported + block_builder + .push_transfer(Transfer { + from: AccountKeyring::Alice.into(), + to: AccountKeyring::Ferdie.into(), + amount: 41, + nonce: 0, + }) + .unwrap(); + let block_4 = block_builder.build().unwrap().block; + let block_4_hash = block_4.header.hash(); + client.import(BlockOrigin::Own, block_4.clone()).await.unwrap(); + + let mut block_builder = + client.new_block_at(block_4.header.hash(), Default::default(), false).unwrap(); + block_builder + .push_transfer(Transfer { + from: AccountKeyring::Bob.into(), + to: AccountKeyring::Ferdie.into(), + amount: 41, + nonce: 0, + }) + .unwrap(); + let block_5 = block_builder.build().unwrap().block; + let block_5_hash = block_5.header.hash(); + client.import(BlockOrigin::Own, block_5.clone()).await.unwrap(); + let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap(); + + // Initialized must always be reported first. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Initialized(Initialized { + finalized_block_hash: format!("{:?}", finalized_hash), + finalized_block_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_1_hash), + parent_block_hash: format!("{:?}", finalized_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_2_hash), + parent_block_hash: format!("{:?}", block_1_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_3_hash), + parent_block_hash: format!("{:?}", block_2_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + // The fork must also be reported. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_4_hash), + parent_block_hash: format!("{:?}", block_1_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_5_hash), + parent_block_hash: format!("{:?}", block_4_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + + // The best block of the chain must also be reported. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_3_hash), + }); + assert_eq!(event, expected); + + // Block 4 and 5 are not pruned, pruning happens at height (N - 1). + client.finalize_block(block_3_hash, None).unwrap(); + + // Finalizing block 3 directly will also result in block 1 and 2 being finalized. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Finalized(Finalized { + finalized_block_hashes: vec![ + format!("{:?}", block_1_hash), + format!("{:?}", block_2_hash), + format!("{:?}", block_3_hash), + ], + pruned_block_hashes: vec![], + }); + assert_eq!(event, expected); + + // Block tree: + // + // finalized -> block 1 -> block 2 -> block 3 -> block 6 + // ^^^ finalized + // -> block 1 -> block 4 -> block 5 + // + // Mark block 6 as finalized to force block 4 and 5 to get pruned. + + let block_6 = client.new_block(Default::default()).unwrap().build().unwrap().block; + let block_6_hash = block_6.header.hash(); + client.import(BlockOrigin::Own, block_6.clone()).await.unwrap(); + + client.finalize_block(block_6_hash, None).unwrap(); + + // Check block 6. + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::NewBlock(NewBlock { + block_hash: format!("{:?}", block_6_hash), + parent_block_hash: format!("{:?}", block_3_hash), + new_runtime: None, + runtime_updates: false, + }); + assert_eq!(event, expected); + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::BestBlockChanged(BestBlockChanged { + best_block_hash: format!("{:?}", block_6_hash), + }); + assert_eq!(event, expected); + + // Block 4 and 5 be reported as pruned, not just the stale head (block 5). + let event: FollowEvent = get_next_event(&mut sub).await; + let expected = FollowEvent::Finalized(Finalized { + finalized_block_hashes: vec![format!("{:?}", block_6_hash)], + pruned_block_hashes: vec![format!("{:?}", block_4_hash), format!("{:?}", block_5_hash)], + }); + assert_eq!(event, expected); }