Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc v2: backpressure chainhead_v1_follow #6058

Merged
merged 15 commits into from
Oct 18, 2024
18 changes: 18 additions & 0 deletions prdoc/pr_6058.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: backpressure `chainhead_v1_follow`

doc:
- audience: Node Operator
description: |
The RPC endpoint `chainHead_v1_follow` now relies on backpressure
to determine whether or not the subscription should be closed instead of continuing to send more events
to a consumer which can't keep up.
This should significantly improve memory consumption as substrate will be keeping less messages in memory.

crates:
- name: sc-rpc-spec-v2
pkhry marked this conversation as resolved.
Show resolved Hide resolved
bump: major
- name: sc-rpc
bump: major
8 changes: 8 additions & 0 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ pub struct ChainHeadConfig {
pub max_lagging_distance: usize,
/// The maximum number of `chainHead_follow` subscriptions per connection.
pub max_follow_subscriptions_per_connection: usize,
/// The maximum number of pending messages per subscription.
pub subscription_buffer_cap: usize,
}

/// Maximum pinned blocks across all connections.
Expand Down Expand Up @@ -107,6 +109,7 @@ impl Default for ChainHeadConfig {
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
subscription_buffer_cap: MAX_PINNED_BLOCKS,
}
}
}
Expand All @@ -126,6 +129,8 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
max_lagging_distance: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
/// The maximum number of pending messages per subscription.
subscription_buffer_cap: usize,
}

impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
Expand All @@ -148,6 +153,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
backend,
),
max_lagging_distance: config.max_lagging_distance,
subscription_buffer_cap: config.subscription_buffer_cap,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -196,6 +202,7 @@ where
let backend = self.backend.clone();
let client = self.client.clone();
let max_lagging_distance = self.max_lagging_distance;
let subscription_buffer_cap = self.subscription_buffer_cap;

let fut = async move {
// Ensure the current connection ID has enough space to accept a new subscription.
Expand Down Expand Up @@ -231,6 +238,7 @@ where
with_runtime,
sub_id.clone(),
max_lagging_distance,
subscription_buffer_cap,
);
let result = chain_head_follow.generate_events(sink, sub_data).await;
if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
Expand Down
92 changes: 37 additions & 55 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ use crate::chain_head::{
};
use futures::{
channel::oneshot,
stream::{self, Stream, StreamExt},
stream::{self, Stream, StreamExt, TryStreamExt},
};
use futures_util::future::Either;
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
Expand Down Expand Up @@ -74,6 +73,8 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
max_lagging_distance: usize,
/// The maximum number of pending messages per subscription.
pub subscription_buffer_cap: usize,
}

struct AnnouncedBlocks<Block: BlockT> {
Expand Down Expand Up @@ -148,6 +149,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
with_runtime: bool,
sub_id: String,
max_lagging_distance: usize,
subscription_buffer_cap: usize,
) -> Self {
Self {
client,
Expand All @@ -161,6 +163,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
)),
announced_blocks: AnnouncedBlocks::new(),
max_lagging_distance,
subscription_buffer_cap,
}
}
}
Expand Down Expand Up @@ -705,71 +708,50 @@ where
async fn submit_events<EventStream>(
&mut self,
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
stream: EventStream,
sink: Subscription,
rx_stop: oneshot::Receiver<()>,
) -> Result<(), SubscriptionManagementError>
where
EventStream: Stream<Item = NotificationType<Block>> + Unpin,
EventStream: Stream<Item = NotificationType<Block>> + Unpin + Send,
{
let mut stream_item = stream.next();

// The stop event can be triggered by the chainHead logic when the pinned
// block guarantee cannot be hold. Or when the client is disconnected.
let connection_closed = sink.closed();
tokio::pin!(connection_closed);
let mut stop_event = futures_util::future::select(rx_stop, connection_closed);

while let Either::Left((Some(event), next_stop_event)) =
futures_util::future::select(stream_item, stop_event).await
{
let events = match event {
NotificationType::InitialEvents(events) => Ok(events),
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};
let buffer_cap = self.subscription_buffer_cap;
// create a channel to propagate error messages
let mut handle_events = |event| match event {
NotificationType::InitialEvents(events) => Ok(events),
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};

let events = match events {
Ok(events) => events,
Err(err) => {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to handle stream notification {:?}",
self.sub_id,
err
);
_ = sink.send(&FollowEvent::<String>::Stop).await;
return Err(err)
},
};
let stream = stream
Copy link
Member

@niklasad1 niklasad1 Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit tempting to add something similar to pipe_from_stream that takes a TryStream i.,e that produces Result<T, Error> and bails on the first error similar to how try_for_each works to avoid having this channel to be injected here just to get back the error and then quit....

Nothing that needs to be addressed in this PR but would be neat :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added to the pr

.map(|event| handle_events(event))
.map_ok(|items| stream::iter(items).map(Ok))
.try_flatten();

tokio::pin!(stream);

let sink_future =
sink.pipe_from_try_stream(stream, sc_rpc::utils::BoundedVecDeque::new(buffer_cap));

for event in events {
if let Err(err) = sink.send(&event).await {
// Failed to submit event.
let result = tokio::select! {
_ = rx_stop => Ok(()),
result = sink_future => {
if let Err(ref e) = result {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to send event {:?}", self.sub_id, err
"[follow][id={:?}] Failed to handle stream notification {:?}",
&self.sub_id,
e
);

let _ = sink.send(&FollowEvent::<String>::Stop).await;
// No need to propagate this error further, the client disconnected.
return Ok(())
}
};
result
}

stream_item = stream.next();
stop_event = next_stop_event;
}

// If we got here either:
// - the substrate streams have closed
// - the `Stop` receiver was triggered internally (cannot hold the pinned block guarantee)
// - the client disconnected.
};
let _ = sink.send(&FollowEvent::<String>::Stop).await;
Ok(())
result
}

/// Generate the block events for the `chainHead_follow` method.
Expand Down
Loading
Loading