diff --git a/bridges/relays/client-substrate/src/client.rs b/bridges/relays/client-substrate/src/client.rs index cae1aadf821e..8a9b5c767aff 100644 --- a/bridges/relays/client-substrate/src/client.rs +++ b/bridges/relays/client-substrate/src/client.rs @@ -847,16 +847,19 @@ impl Client { } } -impl Drop for Subscription { - fn drop(&mut self) {} -} - impl Subscription { /// Consumes subscription and returns future statuses stream. pub fn into_stream(self) -> impl futures::Stream { - futures::stream::unfold(self, |this| async { + futures::stream::unfold(Some(self), |mut this| async move { + let Some(this) = this.take() else { return None }; let item = this.0.lock().await.next().await.unwrap_or(None); - item.map(|i| (i, this)) + match item { + Some(item) => Some((item, Some(this))), + None => { + let _ = this.1.send(()); + None + }, + } }) } @@ -882,7 +885,6 @@ impl Subscription { item_type, ); -let (cancel_sender, cancel_receiver) = futures::channel::oneshot::channel::<()>(); // TODO: remove me futures::pin_mut!(subscription, cancel_receiver); loop { match futures::future::select(subscription.next(), &mut cancel_receiver).await { @@ -925,7 +927,7 @@ let (cancel_sender, cancel_receiver) = futures::channel::oneshot::channel::<()>( chain_name, item_type, ); - //TODO: uncomment me break; + break; }, } }