diff --git a/src/bin/ws_robust_subs_trades.rs b/src/bin/ws_robust_subs_trades.rs index 407dc30..040d5e6 100644 --- a/src/bin/ws_robust_subs_trades.rs +++ b/src/bin/ws_robust_subs_trades.rs @@ -1,7 +1,7 @@ use hyperliquid_rust_sdk::robust::Subs; use hyperliquid_rust_sdk::{BaseUrl, Subscription}; use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tokio::time::{sleep, Duration}; /// Stream trades for BTC/USD using the subscription helper @@ -9,7 +9,7 @@ use tokio::time::{sleep, Duration}; async fn main() { env_logger::init(); - let (cancel_tx, cancel_rx) = oneshot::channel(); + let (cancel_tx, cancel_rx) = mpsc::channel(1); let subs = Arc::new(Subs::connect(&BaseUrl::Mainnet, cancel_rx).await.unwrap()); diff --git a/src/ws/robust/subs.rs b/src/ws/robust/subs.rs index 67a2b04..4fed97b 100644 --- a/src/ws/robust/subs.rs +++ b/src/ws/robust/subs.rs @@ -4,7 +4,7 @@ use crate::BaseUrl; use anyhow::Result; use log::{debug, error, trace}; use std::sync::{atomic::AtomicU32, Arc}; -use tokio::sync::{mpsc, oneshot, Mutex, RwLock}; +use tokio::sync::{mpsc, Mutex, RwLock}; type Topic = super::super::ws_manager::Subscription; @@ -26,7 +26,7 @@ impl Drop for SubToken { impl Subs { pub async fn connect( base_url: &BaseUrl, - cancel_rx: oneshot::Receiver<()>, + mut cancel_rx: mpsc::Receiver<()>, ) -> Result> { let (message_tx, message_rx) = mpsc::unbounded_channel(); @@ -44,7 +44,7 @@ impl Subs { _ = subs.start(message_rx) => { trace!("Subs has ended"); }, - _ = cancel_rx => { + _ = cancel_rx.recv() => { trace!("Received cancel command"); if let Err(e) = stream_cancel_tx.send(()) {