Skip to content

Commit

Permalink
Avoid using extra background task for polling subscription when possi…
Browse files Browse the repository at this point in the history
…ble (#2206)

* Avoid using extra background task for TransactionTracker

* Add docs
  • Loading branch information
serban300 authored and bkontur committed May 22, 2024
1 parent 7faaeca commit b438560
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 110 deletions.
2 changes: 1 addition & 1 deletion bridges/relays/client-substrate/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ mod rpc_api;
mod subscription;

pub use client::Client;
pub use subscription::{SharedSubscriptionFactory, Subscription};
pub use subscription::{SharedSubscriptionFactory, Subscription, UnderlyingSubscription};

/// Type of RPC client with caching support.
pub type RpcWithCachingClient<C> = CachingClient<C, RpcClient<C>>;
Expand Down
29 changes: 13 additions & 16 deletions bridges/relays/client-substrate/src/client/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ use crate::{
SubstrateFrameSystemClient, SubstrateGrandpaClient, SubstrateStateClient,
SubstrateSystemClient,
},
subscription::{Subscription, Unwrap},
Client,
},
error::{Error, Result},
guard::Environment,
transaction_stall_timeout, AccountIdOf, AccountKeyPairOf, BalanceOf, BlockNumberOf, Chain,
ChainRuntimeVersion, ChainWithGrandpa, ChainWithTransactions, ConnectionParams, HashOf,
HeaderIdOf, HeaderOf, NonceOf, SignParam, SignedBlockOf, SimpleRuntimeVersion, Subscription,
HeaderIdOf, HeaderOf, NonceOf, SignParam, SignedBlockOf, SimpleRuntimeVersion,
TransactionTracker, UnsignedTransaction,
};

Expand Down Expand Up @@ -495,26 +496,22 @@ impl<C: Chain> Client<C> for RpcClient<C> {
);
let signed_extrinsic = C::sign_transaction(signing_data, extrinsic)?.encode();
let tx_hash = C::Hasher::hash(&signed_extrinsic);
let subscription = SubstrateAuthorClient::<C>::submit_and_watch_extrinsic(
&*client,
Bytes(signed_extrinsic),
)
.await
.map_err(|e| {
log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e);
e
})?;
let subscription: jsonrpsee::core::client::Subscription<_> =
SubstrateAuthorClient::<C>::submit_and_watch_extrinsic(
&*client,
Bytes(signed_extrinsic),
)
.await
.map_err(|e| {
log::error!(target: "bridge", "Failed to send transaction to {} node: {:?}", C::NAME, e);
e
})?;
log::trace!(target: "bridge", "Sent transaction to {} node: {:?}", C::NAME, tx_hash);
Ok(TransactionTracker::new(
self_clone,
stall_timeout,
tx_hash,
Subscription::new(
C::NAME.into(),
"transaction events".into(),
Box::new(subscription.map_err(Into::into)),
)
.await?,
Box::new(Unwrap::new(C::NAME.into(), "transaction events".into(), subscription)),
))
})
.await
Expand Down
159 changes: 77 additions & 82 deletions bridges/relays/client-substrate/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,72 @@ use async_std::{
channel::{bounded, Receiver, Sender},
stream::StreamExt,
};
use futures::{future::FutureExt, Stream};
use futures::{FutureExt, Stream};
use sp_runtime::DeserializeOwned;
use std::future::Future;
use std::{
fmt::Debug,
pin::Pin,
task::{Context, Poll},
};

/// Once channel reaches this capacity, the subscription breaks.
const CHANNEL_CAPACITY: usize = 128;

/// Underlying subscription type.
pub type UnderlyingSubscription<T> = Box<dyn Stream<Item = Result<T>> + Unpin + Send>;
pub type UnderlyingSubscription<T> = Box<dyn Stream<Item = T> + Unpin + Send>;

/// Chainable stream that transforms items of type `Result<T, E>` to items of type `T`.
///
/// If it encounters an item of type `Err`, it returns `Poll::Ready(None)`
/// and terminates the underlying stream.
pub struct Unwrap<S: Stream<Item = std::result::Result<T, E>>, T, E> {
chain_name: String,
item_type: String,
subscription: Option<S>,
}

impl<S: Stream<Item = std::result::Result<T, E>>, T, E> Unwrap<S, T, E> {
/// Create a new instance of `Unwrap`.
pub fn new(chain_name: String, item_type: String, subscription: S) -> Self {
Self { chain_name, item_type, subscription: Some(subscription) }
}
}

impl<S: Stream<Item = std::result::Result<T, E>> + Unpin, T: DeserializeOwned, E: Debug> Stream
for Unwrap<S, T, E>
{
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(match self.subscription.as_mut() {
Some(subscription) => match futures::ready!(Pin::new(subscription).poll_next(cx)) {
Some(Ok(item)) => Some(item),
Some(Err(e)) => {
self.subscription.take();
log::debug!(
target: "bridge",
"{} stream of {} has returned error: {:?}. It may need to be restarted",
self.item_type,
self.chain_name,
e,
);
None
},
None => {
self.subscription.take();
log::debug!(
target: "bridge",
"{} stream of {} has returned `None`. It may need to be restarted",
self.item_type,
self.chain_name,
);
None
},
},
None => None,
})
}
}

/// Subscription factory that produces subscriptions, sharing the same background thread.
#[derive(Clone)]
Expand All @@ -41,13 +98,13 @@ impl<T: 'static + Clone + DeserializeOwned + Send> SharedSubscriptionFactory<T>
pub async fn new(
chain_name: String,
item_type: String,
subscribe: impl Future<Output = Result<UnderlyingSubscription<T>>> + Send + 'static,
subscription: UnderlyingSubscription<std::result::Result<T, jsonrpsee::core::ClientError>>,
) -> Self {
let (subscribers_sender, subscribers_receiver) = bounded(CHANNEL_CAPACITY);
async_std::task::spawn(background_worker(
chain_name,
item_type,
subscribe,
chain_name.clone(),
item_type.clone(),
Box::new(Unwrap::new(chain_name, item_type, subscription)),
subscribers_receiver,
));
Self { subscribers_sender }
Expand All @@ -73,16 +130,12 @@ impl<T: 'static + Clone + DeserializeOwned + Send> Subscription<T> {
pub async fn new(
chain_name: String,
item_type: String,
subscription: UnderlyingSubscription<T>,
subscription: UnderlyingSubscription<std::result::Result<T, jsonrpsee::core::ClientError>>,
) -> Result<Self> {
SharedSubscriptionFactory::<T>::new(
chain_name,
item_type,
futures::future::ready(Ok(subscription)),
)
.await
.subscribe()
.await
SharedSubscriptionFactory::<T>::new(chain_name, item_type, subscription)
.await
.subscribe()
.await
}

/// Return subscription factory for this subscription.
Expand All @@ -91,7 +144,7 @@ impl<T: 'static + Clone + DeserializeOwned + Send> Subscription<T> {
}

/// Consumes subscription and returns future items stream.
pub fn into_stream(self) -> impl futures::Stream<Item = T> {
pub fn into_stream(self) -> impl Stream<Item = T> {
futures::stream::unfold(self, |mut this| async {
let item = this.items_receiver.next().await.unwrap_or(None);
item.map(|i| (i, this))
Expand All @@ -112,7 +165,7 @@ impl<T: 'static + Clone + DeserializeOwned + Send> Subscription<T> {
async fn background_worker<T: 'static + Clone + DeserializeOwned + Send>(
chain_name: String,
item_type: String,
subscribe: impl Future<Output = Result<UnderlyingSubscription<T>>> + Send + 'static,
mut subscription: UnderlyingSubscription<T>,
mut subscribers_receiver: Receiver<Sender<Option<T>>>,
) {
fn log_task_exit(chain_name: &str, item_type: &str, reason: &str) {
Expand All @@ -125,57 +178,6 @@ async fn background_worker<T: 'static + Clone + DeserializeOwned + Send>(
);
}

async fn notify_subscribers<T: Clone>(
chain_name: &str,
item_type: &str,
subscribers: &mut Vec<Sender<Option<T>>>,
result: Option<Result<T>>,
) {
let result_to_send = match result {
Some(Ok(item)) => Some(item),
Some(Err(e)) => {
log::debug!(
target: "bridge",
"{} stream of {} has returned error: {:?}. It may need to be restarted",
item_type,
chain_name,
e,
);
None
},
None => {
log::debug!(
target: "bridge",
"{} stream of {} has returned `None`. It may need to be restarted",
item_type,
chain_name,
);
None
},
};

let mut i = 0;
while i < subscribers.len() {
let result_to_send = result_to_send.clone();
let send_result = subscribers[i].try_send(result_to_send);
match send_result {
Ok(_) => {
i += 1;
},
Err(_) => {
subscribers.swap_remove(i);
},
}
}
}

log::trace!(
target: "bridge",
"Starting background task for {} {} subscription stream.",
chain_name,
item_type,
);

// wait for first subscriber until actually starting subscription
let subscriber = match subscribers_receiver.next().await {
Some(subscriber) => subscriber,
Expand All @@ -188,16 +190,6 @@ async fn background_worker<T: 'static + Clone + DeserializeOwned + Send>(

// actually subscribe
let mut subscribers = vec![subscriber];
let mut jsonrpsee_subscription = match subscribe.await {
Ok(jsonrpsee_subscription) => jsonrpsee_subscription,
Err(e) => {
let reason = format!("failed to subscribe: {:?}", e);
notify_subscribers(&chain_name, &item_type, &mut subscribers, Some(Err(e))).await;

// we cant't do anything without underlying subscription, so let's exit
return log_task_exit(&chain_name, &item_type, &reason)
},
};

// start listening for new items and receivers
loop {
Expand All @@ -212,10 +204,13 @@ async fn background_worker<T: 'static + Clone + DeserializeOwned + Send>(
},
}
},
item = jsonrpsee_subscription.next().fuse() => {
item = subscription.next().fuse() => {
let is_stream_finished = item.is_none();
let item = item.map(|r| r.map_err(Into::into));
notify_subscribers(&chain_name, &item_type, &mut subscribers, item).await;
// notify subscribers
subscribers.retain(|subscriber| {
let send_result = subscriber.try_send(item.clone());
send_result.is_ok()
});

// it means that the underlying client has dropped, so we can't do anything here
// and need to stop the task
Expand Down
20 changes: 9 additions & 11 deletions bridges/relays/client-substrate/src/transaction_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

//! Helper for tracking transaction invalidation events.

use crate::{Chain, Error, HashOf, HeaderIdOf, Subscription, TransactionStatusOf};
use crate::{
client::UnderlyingSubscription, Chain, Error, HashOf, HeaderIdOf, TransactionStatusOf,
};

use async_trait::async_trait;
use futures::{future::Either, Future, FutureExt, Stream, StreamExt};
Expand Down Expand Up @@ -64,7 +66,7 @@ pub struct TransactionTracker<C: Chain, E> {
environment: E,
transaction_hash: HashOf<C>,
stall_timeout: Duration,
subscription: Subscription<TransactionStatusOf<C>>,
subscription: UnderlyingSubscription<TransactionStatusOf<C>>,
}

impl<C: Chain, E: Environment<C>> TransactionTracker<C, E> {
Expand All @@ -73,7 +75,7 @@ impl<C: Chain, E: Environment<C>> TransactionTracker<C, E> {
environment: E,
stall_timeout: Duration,
transaction_hash: HashOf<C>,
subscription: Subscription<TransactionStatusOf<C>>,
subscription: UnderlyingSubscription<TransactionStatusOf<C>>,
) -> Self {
Self { environment, stall_timeout, transaction_hash, subscription }
}
Expand Down Expand Up @@ -105,7 +107,7 @@ impl<C: Chain, E: Environment<C>> TransactionTracker<C, E> {
let wait_for_invalidation = watch_transaction_status::<_, C, _>(
self.environment,
self.transaction_hash,
self.subscription.into_stream(),
self.subscription,
);
futures::pin_mut!(wait_for_stall_timeout, wait_for_invalidation);

Expand Down Expand Up @@ -328,17 +330,15 @@ mod tests {
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription::new("test".into(), "test".into(), Box::new(receiver))
.await
.unwrap(),
Box::new(receiver),
);

// we can't do `.now_or_never()` on `do_wait()` call, because `Subscription` has its own
// background thread, which may cause additional async task switches => let's leave some
// relatively small timeout here
let wait_for_stall_timeout = async_std::task::sleep(std::time::Duration::from_millis(100));
let wait_for_stall_timeout_rest = futures::future::ready(());
sender.send(Ok(status)).await.unwrap();
sender.send(status).await.unwrap();

let (ts, is) =
tx_tracker.do_wait(wait_for_stall_timeout, wait_for_stall_timeout_rest).await;
Expand Down Expand Up @@ -455,9 +455,7 @@ mod tests {
TestEnvironment(Ok(HeaderId(0, Default::default()))),
Duration::from_secs(0),
Default::default(),
Subscription::new("test".into(), "test".into(), Box::new(receiver))
.await
.unwrap(),
Box::new(receiver),
);

let wait_for_stall_timeout = futures::future::ready(()).shared();
Expand Down

0 comments on commit b438560

Please sign in to comment.