diff --git a/prdoc/pr_6058.prdoc b/prdoc/pr_6058.prdoc new file mode 100644 index 000000000000..5b99467b413f --- /dev/null +++ b/prdoc/pr_6058.prdoc @@ -0,0 +1,18 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: backpressure `chainhead_v1_follow` + +doc: + - audience: Node Operator + description: | + The RPC endpoint `chainHead_v1_follow` now relies on backpressure + to determine whether or not the subscription should be closed instead of continuing to send more events + to a consumer which can't keep up. + This should significantly improve memory consumption as substrate will be keeping less messages in memory. + +crates: + - name: sc-rpc-spec-v2 + bump: major + - name: sc-rpc + bump: major diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs index a88e7f2a0b3a..61eb47d1b9ab 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -75,6 +75,8 @@ pub struct ChainHeadConfig { pub max_lagging_distance: usize, /// The maximum number of `chainHead_follow` subscriptions per connection. pub max_follow_subscriptions_per_connection: usize, + /// The maximum number of pending messages per subscription. + pub subscription_buffer_cap: usize, } /// Maximum pinned blocks across all connections. @@ -107,6 +109,7 @@ impl Default for ChainHeadConfig { subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, } } } @@ -126,6 +129,8 @@ pub struct ChainHead, Block: BlockT, Client> { max_lagging_distance: usize, /// Phantom member to pin the block type. _phantom: PhantomData, + /// The maximum number of pending messages per subscription. + subscription_buffer_cap: usize, } impl, Block: BlockT, Client> ChainHead { @@ -148,6 +153,7 @@ impl, Block: BlockT, Client> ChainHead { backend, ), max_lagging_distance: config.max_lagging_distance, + subscription_buffer_cap: config.subscription_buffer_cap, _phantom: PhantomData, } } @@ -196,6 +202,7 @@ where let backend = self.backend.clone(); let client = self.client.clone(); let max_lagging_distance = self.max_lagging_distance; + let subscription_buffer_cap = self.subscription_buffer_cap; let fut = async move { // Ensure the current connection ID has enough space to accept a new subscription. @@ -231,6 +238,7 @@ where with_runtime, sub_id.clone(), max_lagging_distance, + subscription_buffer_cap, ); let result = chain_head_follow.generate_events(sink, sub_data).await; if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result { diff --git a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs index f2326f015677..e9975b36b4a1 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs @@ -28,9 +28,8 @@ use crate::chain_head::{ }; use futures::{ channel::oneshot, - stream::{self, Stream, StreamExt}, + stream::{self, Stream, StreamExt, TryStreamExt}, }; -use futures_util::future::Either; use log::debug; use sc_client_api::{ Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification, @@ -74,6 +73,8 @@ pub struct ChainHeadFollower, Block: BlockT, Client> { /// Stop all subscriptions if the distance between the leaves and the current finalized /// block is larger than this value. max_lagging_distance: usize, + /// The maximum number of pending messages per subscription. + pub subscription_buffer_cap: usize, } struct AnnouncedBlocks { @@ -148,6 +149,7 @@ impl, Block: BlockT, Client> ChainHeadFollower Self { Self { client, @@ -161,6 +163,7 @@ impl, Block: BlockT, Client> ChainHeadFollower( &mut self, startup_point: &StartupPoint, - mut stream: EventStream, + stream: EventStream, sink: Subscription, rx_stop: oneshot::Receiver<()>, ) -> Result<(), SubscriptionManagementError> where - EventStream: Stream> + Unpin, + EventStream: Stream> + Unpin + Send, { - let mut stream_item = stream.next(); - - // The stop event can be triggered by the chainHead logic when the pinned - // block guarantee cannot be hold. Or when the client is disconnected. - let connection_closed = sink.closed(); - tokio::pin!(connection_closed); - let mut stop_event = futures_util::future::select(rx_stop, connection_closed); - - while let Either::Left((Some(event), next_stop_event)) = - futures_util::future::select(stream_item, stop_event).await - { - let events = match event { - NotificationType::InitialEvents(events) => Ok(events), - NotificationType::NewBlock(notification) => - self.handle_import_blocks(notification, &startup_point), - NotificationType::Finalized(notification) => - self.handle_finalized_blocks(notification, &startup_point), - NotificationType::MethodResponse(notification) => Ok(vec![notification]), - }; + let buffer_cap = self.subscription_buffer_cap; + // create a channel to propagate error messages + let mut handle_events = |event| match event { + NotificationType::InitialEvents(events) => Ok(events), + NotificationType::NewBlock(notification) => + self.handle_import_blocks(notification, &startup_point), + NotificationType::Finalized(notification) => + self.handle_finalized_blocks(notification, &startup_point), + NotificationType::MethodResponse(notification) => Ok(vec![notification]), + }; - let events = match events { - Ok(events) => events, - Err(err) => { - debug!( - target: LOG_TARGET, - "[follow][id={:?}] Failed to handle stream notification {:?}", - self.sub_id, - err - ); - _ = sink.send(&FollowEvent::::Stop).await; - return Err(err) - }, - }; + let stream = stream + .map(|event| handle_events(event)) + .map_ok(|items| stream::iter(items).map(Ok)) + .try_flatten(); + + tokio::pin!(stream); + + let sink_future = + sink.pipe_from_try_stream(stream, sc_rpc::utils::BoundedVecDeque::new(buffer_cap)); - for event in events { - if let Err(err) = sink.send(&event).await { - // Failed to submit event. + let result = tokio::select! { + _ = rx_stop => Ok(()), + result = sink_future => { + if let Err(ref e) = result { debug!( target: LOG_TARGET, - "[follow][id={:?}] Failed to send event {:?}", self.sub_id, err + "[follow][id={:?}] Failed to handle stream notification {:?}", + &self.sub_id, + e ); - - let _ = sink.send(&FollowEvent::::Stop).await; - // No need to propagate this error further, the client disconnected. - return Ok(()) - } + }; + result } - - stream_item = stream.next(); - stop_event = next_stop_event; - } - - // If we got here either: - // - the substrate streams have closed - // - the `Stop` receiver was triggered internally (cannot hold the pinned block guarantee) - // - the client disconnected. + }; let _ = sink.send(&FollowEvent::::Stop).await; - Ok(()) + result } /// Generate the block events for the `chainHead_follow` method. diff --git a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs index 0c2486157bd0..c505566d887d 100644 --- a/substrate/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/substrate/client/rpc-spec-v2/src/chain_head/tests.rs @@ -44,7 +44,7 @@ use sp_core::{ use sp_runtime::traits::Block as BlockT; use sp_version::RuntimeVersion; use std::{ - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet, VecDeque}, fmt::Debug, sync::Arc, time::Duration, @@ -86,6 +86,7 @@ pub async fn run_server() -> std::net::SocketAddr { subscription_max_ongoing_operations: MAX_OPERATIONS, max_follow_subscriptions_per_connection: 1, max_lagging_distance: MAX_LAGGING_DISTANCE, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -147,6 +148,7 @@ async fn setup_api() -> ( subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -254,6 +256,7 @@ async fn follow_subscription_produces_blocks() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -323,6 +326,7 @@ async fn follow_with_runtime() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -631,6 +635,7 @@ async fn call_runtime_without_flag() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -1290,6 +1295,7 @@ async fn separate_operation_ids_for_subscriptions() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -1376,6 +1382,7 @@ async fn follow_generates_initial_blocks() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -1532,6 +1539,7 @@ async fn follow_exceeding_pinned_blocks() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -1609,6 +1617,7 @@ async fn follow_with_unpin() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -1715,6 +1724,7 @@ async fn unpin_duplicate_hashes() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -1818,6 +1828,7 @@ async fn follow_with_multiple_unpin_hashes() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -1963,6 +1974,7 @@ async fn follow_prune_best_block() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -2149,6 +2161,7 @@ async fn follow_forks_pruned_block() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -2309,6 +2322,7 @@ async fn follow_report_multiple_pruned_block() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -2555,6 +2569,7 @@ async fn pin_block_references() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -2690,6 +2705,7 @@ async fn follow_finalized_before_new_block() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -2805,6 +2821,7 @@ async fn ensure_operation_limits_works() { subscription_max_ongoing_operations: 1, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -2910,6 +2927,7 @@ async fn storage_is_backpressured() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -3047,6 +3065,7 @@ async fn stop_storage_operation() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -3344,6 +3363,7 @@ async fn chain_head_stop_all_subscriptions() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: 5, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -3557,6 +3577,7 @@ async fn chain_head_limit_reached() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: 1, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -3597,6 +3618,7 @@ async fn follow_unique_pruned_blocks() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, max_lagging_distance: MAX_LAGGING_DISTANCE, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -3766,6 +3788,7 @@ async fn follow_report_best_block_of_a_known_block() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_lagging_distance: MAX_LAGGING_DISTANCE, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -3984,6 +4007,7 @@ async fn follow_event_with_unknown_parent() { subscription_max_ongoing_operations: MAX_OPERATIONS, max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, max_lagging_distance: MAX_LAGGING_DISTANCE, + subscription_buffer_cap: MAX_PINNED_BLOCKS, }, ) .into_rpc(); @@ -4033,3 +4057,54 @@ async fn follow_event_with_unknown_parent() { // When importing the block 2, chainHead detects a gap in our blocks and stops. assert_matches!(get_next_event::>(&mut sub).await, FollowEvent::Stop); } + +#[tokio::test] +async fn events_are_backpressured() { + let builder = TestClientBuilder::new(); + let backend = builder.backend(); + let client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TokioTestExecutor::default()), + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + max_lagging_distance: MAX_LAGGING_DISTANCE, + max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION, + subscription_buffer_cap: 10, + }, + ) + .into_rpc(); + + let mut parent_hash = client.chain_info().genesis_hash; + let mut header = VecDeque::new(); + let mut sub = api.subscribe("chainHead_v1_follow", [false], 1).await.unwrap(); + + // insert more events than the user can consume + for i in 0..=5 { + let block = BlockBuilderBuilder::new(&*client) + .on_parent_block(parent_hash) + .with_parent_block_number(i) + .build() + .unwrap() + .build() + .unwrap() + .block; + header.push_front(block.header().clone()); + + parent_hash = block.hash(); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + } + + let mut events = Vec::new(); + + while let Some(event) = sub.next::>().await { + events.push(event); + } + + assert_eq!(events.len(), 2); + assert_matches!(events.pop().unwrap().map(|x| x.0), Ok(FollowEvent::Stop)); +} diff --git a/substrate/client/rpc/src/utils.rs b/substrate/client/rpc/src/utils.rs index e2ff04c0baf3..b94f062cddab 100644 --- a/substrate/client/rpc/src/utils.rs +++ b/substrate/client/rpc/src/utils.rs @@ -21,7 +21,7 @@ use crate::SubscriptionTaskExecutor; use futures::{ future::{self, Either, Fuse, FusedFuture}, - Future, FutureExt, Stream, StreamExt, + Future, FutureExt, Stream, StreamExt, TryStream, TryStreamExt, }; use jsonrpsee::{ types::SubscriptionId, DisconnectError, PendingSubscriptionSink, SubscriptionMessage, @@ -173,14 +173,27 @@ impl From for Subscription { impl Subscription { /// Feed items to the subscription from the underlying stream /// with specified buffer strategy. - pub async fn pipe_from_stream(self, mut stream: S, mut buf: B) + pub async fn pipe_from_stream(&self, stream: S, buf: B) where - S: Stream + Unpin + Send + 'static, - T: Serialize + Send + 'static, + S: Stream + Unpin, + T: Serialize + Send, + B: Buffer, + { + self.pipe_from_try_stream(stream.map(Ok::), buf) + .await + .expect("No Err will be ever encountered.qed"); + } + + /// Feed items to the subscription from the underlying stream + /// with specified buffer strategy. + pub async fn pipe_from_try_stream(&self, mut stream: S, mut buf: B) -> Result<(), E> + where + S: TryStream + Unpin, + T: Serialize + Send, B: Buffer, { let mut next_fut = Box::pin(Fuse::terminated()); - let mut next_item = stream.next(); + let mut next_item = stream.try_next(); let closed = self.0.closed(); futures::pin_mut!(closed); @@ -201,7 +214,7 @@ impl Subscription { next_fut = Box::pin(Fuse::terminated()); }, // New item from the stream - Either::Right((Either::Right((Some(v), n)), c)) => { + Either::Right((Either::Right((Ok(Some(v)), n)), c)) => { if buf.push(v).is_err() { log::debug!( target: "rpc", @@ -209,31 +222,35 @@ impl Subscription { self.0.method_name(), self.0.connection_id().0 ); - return + return Ok(()); } next_fut = n; closed = c; - next_item = stream.next(); + next_item = stream.try_next(); }, + // Error occured while processing the stream. + // + // terminate the stream. + Either::Right((Either::Right((Err(e), _)), _)) => return Err(e), // Stream "finished". // // Process remaining items and terminate. - Either::Right((Either::Right((None, pending_fut)), _)) => { + Either::Right((Either::Right((Ok(None), pending_fut)), _)) => { if !pending_fut.is_terminated() && pending_fut.await.is_err() { - return; + return Ok(()); } while let Some(v) = buf.pop() { if self.send(&v).await.is_err() { - return; + return Ok(()); } } - return; + return Ok(()); }, // Subscription was closed. - Either::Left(_) => return, + Either::Left(_) => return Ok(()), } } }