Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc v2: backpressure chainhead_v1_follow #6058

Merged
merged 15 commits into from
Oct 18, 2024
18 changes: 18 additions & 0 deletions prdoc/pr_6058.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: backpressure `chainhead_v1_follow`

doc:
- audience: Node Operator
description: |
The RPC endpoint `chainHead_v1_follow` now relies on backpressure
to determine whether or not the subscription should be closed instead of continuing to send more events
to a consumer which can't keep up.
This should significantly improve memory consumption as substrate will be keeping less messages in memory.

crates:
- name: sc-rpc-spec-v2
pkhry marked this conversation as resolved.
Show resolved Hide resolved
bump: patch
- name: sc-rpc
bump: patch
pkhry marked this conversation as resolved.
Show resolved Hide resolved
113 changes: 64 additions & 49 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use crate::chain_head::{
use futures::{
channel::oneshot,
stream::{self, Stream, StreamExt},
SinkExt,
};
use futures_util::future::Either;
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
Expand All @@ -53,6 +53,10 @@ use std::{
/// `Initialized` event.
const MAX_FINALIZED_BLOCKS: usize = 16;

/// Maximum amount of events buffered by submit_events
/// before dropping the stream.
const MAX_BUFFERED_EVENTS: usize = 512;
pkhry marked this conversation as resolved.
Show resolved Hide resolved

/// Generates the events of the `chainHead_follow` method.
pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
Expand Down Expand Up @@ -705,71 +709,82 @@ where
async fn submit_events<EventStream>(
&mut self,
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
stream: EventStream,
sink: Subscription,
rx_stop: oneshot::Receiver<()>,
) -> Result<(), SubscriptionManagementError>
where
EventStream: Stream<Item = NotificationType<Block>> + Unpin,
EventStream: Stream<Item = NotificationType<Block>> + Unpin + Send,
{
let mut stream_item = stream.next();

// The stop event can be triggered by the chainHead logic when the pinned
// block guarantee cannot be hold. Or when the client is disconnected.
let connection_closed = sink.closed();
tokio::pin!(connection_closed);
let mut stop_event = futures_util::future::select(rx_stop, connection_closed);

while let Either::Left((Some(event), next_stop_event)) =
futures_util::future::select(stream_item, stop_event).await
{
let events = match event {
// make the stream abortable
let (stream, cancel_handle) = futures::stream::abortable(stream);
pkhry marked this conversation as resolved.
Show resolved Hide resolved

// create a channel to propagate error messages
let (tx_send, mut tx_receive) = futures::channel::mpsc::channel(1);
let mut events = move |event| {
pkhry marked this conversation as resolved.
Show resolved Hide resolved
match event {
NotificationType::InitialEvents(events) => Ok(events),
NotificationType::NewBlock(notification) =>
self.handle_import_blocks(notification, &startup_point),
NotificationType::Finalized(notification) =>
self.handle_finalized_blocks(notification, &startup_point),
NotificationType::MethodResponse(notification) => Ok(vec![notification]),
};

let events = match events {
Ok(events) => events,
Err(err) => {
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to handle stream notification {:?}",
self.sub_id,
err
);
_ = sink.send(&FollowEvent::<String>::Stop).await;
return Err(err)
},
};
}
.inspect_err(|e| {
debug!(
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
target: LOG_TARGET,
"[follow][id={:?}] Failed to handle stream notification {:?}",
&self.sub_id,
e
);
})
};

for event in events {
if let Err(err) = sink.send(&event).await {
// Failed to submit event.
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to send event {:?}", self.sub_id, err
);

let _ = sink.send(&FollowEvent::<String>::Stop).await;
// No need to propagate this error further, the client disconnected.
return Ok(())
let stream = stream
Copy link
Member

@niklasad1 niklasad1 Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit tempting to add something similar to pipe_from_stream that takes a TryStream i.,e that produces Result<T, Error> and bails on the first error similar to how try_for_each works to avoid having this channel to be injected here just to get back the error and then quit....

Nothing that needs to be addressed in this PR but would be neat :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added to the pr

.map(move |event| events(event))
.then(|event| {
let tx_send = tx_send.clone();
async move {
let mut tx_send = tx_send.clone();

let result = match event {
Ok(events) => stream::iter(events),
Err(err) => {
tx_send
.send(err)
.await
.expect("mpsc::{Sender, Receiver} are not dropped; qed");
stream::iter(vec![])
},
};
result
}
})
.flatten();

tokio::pin!(stream);
tokio::pin!(rx_stop);
pkhry marked this conversation as resolved.
Show resolved Hide resolved

let sink_future =
sink.pipe_from_stream(stream, sc_rpc::utils::BoundedVecDeque::new(MAX_BUFFERED_EVENTS));

tokio::pin!(sink_future);
pkhry marked this conversation as resolved.
Show resolved Hide resolved
let result = tokio::select! {
err = tx_receive.next() => {
pkhry marked this conversation as resolved.
Show resolved Hide resolved
Err(err.expect("mpsc::{Sender, Receiver} are not dropped; qed"))
}
_ = rx_stop => Ok(()),
_ = sink_future => Ok(()),
};

stream_item = stream.next();
stop_event = next_stop_event;
}
cancel_handle.abort();

// If we got here either:
// - the substrate streams have closed
// - the `Stop` receiver was triggered internally (cannot hold the pinned block guarantee)
// - the client disconnected.
// // If we got here either:
pkhry marked this conversation as resolved.
Show resolved Hide resolved
// // - the substrate streams have closed
// // - the `Stop` receiver was triggered internally (cannot hold the pinned block
// guarantee) // - the client disconnected.
let _ = sink.send(&FollowEvent::<String>::Stop).await;
Ok(())
result
}

/// Generate the block events for the `chainHead_follow` method.
Expand Down
52 changes: 51 additions & 1 deletion substrate/client/rpc-spec-v2/src/chain_head/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use sp_core::{
use sp_runtime::traits::Block as BlockT;
use sp_version::RuntimeVersion;
use std::{
collections::{HashMap, HashSet},
collections::{HashMap, HashSet, VecDeque},
fmt::Debug,
sync::Arc,
time::Duration,
Expand Down Expand Up @@ -4033,3 +4033,53 @@ async fn follow_event_with_unknown_parent() {
// When importing the block 2, chainHead detects a gap in our blocks and stops.
assert_matches!(get_next_event::<FollowEvent<String>>(&mut sub).await, FollowEvent::Stop);
}

#[tokio::test]
async fn events_are_backpressured() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let client = Arc::new(builder.build());

let api = ChainHead::new(
client.clone(),
backend,
Arc::new(TokioTestExecutor::default()),
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS),
subscription_max_ongoing_operations: MAX_OPERATIONS,
max_lagging_distance: MAX_LAGGING_DISTANCE,
max_follow_subscriptions_per_connection: MAX_FOLLOW_SUBSCRIPTIONS_PER_CONNECTION,
},
)
.into_rpc();

let mut parent_hash = client.chain_info().genesis_hash;
let mut header = VecDeque::new();
let mut sub = api.subscribe("chainHead_v1_follow", [false], 511).await.unwrap();

// insert more events than the user can consume
for i in 0..=512 {
let block = BlockBuilderBuilder::new(&*client)
.on_parent_block(parent_hash)
.with_parent_block_number(i)
.build()
.unwrap()
.build()
.unwrap()
.block;
header.push_front(block.header().clone());

parent_hash = block.hash();
client.import(BlockOrigin::Own, block.clone()).await.unwrap();
}

let mut events = Vec::new();

while let Some(event) = sub.next::<FollowEvent<String>>().await {
events.push(event);
}

assert_eq!(events.len(), 64);
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
assert_matches!(events.pop().unwrap().map(|x| x.0), Ok(FollowEvent::Stop));
}
6 changes: 3 additions & 3 deletions substrate/client/rpc/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ impl From<SubscriptionSink> for Subscription {
impl Subscription {
/// Feed items to the subscription from the underlying stream
/// with specified buffer strategy.
pub async fn pipe_from_stream<S, T, B>(self, mut stream: S, mut buf: B)
pub async fn pipe_from_stream<S, T, B>(&self, mut stream: S, mut buf: B)
where
S: Stream<Item = T> + Unpin + Send + 'static,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok nice catch :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, this was causing the lifetime issue Pavlo mentioned before, nice catch indeed :D

T: Serialize + Send + 'static,
S: Stream<Item = T> + Unpin,
T: Serialize + Send,
B: Buffer<Item = T>,
{
let mut next_fut = Box::pin(Fuse::terminated());
Expand Down
Loading