Skip to content

Commit

Permalink
use mpsc for cancel channel
Browse files Browse the repository at this point in the history
  • Loading branch information
abrkn committed Aug 17, 2024
1 parent d2db7ed commit 029ad46
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
4 changes: 2 additions & 2 deletions src/bin/ws_robust_subs_trades.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use hyperliquid_rust_sdk::robust::Subs;
use hyperliquid_rust_sdk::{BaseUrl, Subscription};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

/// Stream trades for BTC/USD using the subscription helper
#[tokio::main]
async fn main() {
env_logger::init();

let (cancel_tx, cancel_rx) = oneshot::channel();
let (cancel_tx, cancel_rx) = mpsc::channel(1);

let subs = Arc::new(Subs::connect(&BaseUrl::Mainnet, cancel_rx).await.unwrap());

Expand Down
6 changes: 3 additions & 3 deletions src/ws/robust/subs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::BaseUrl;
use anyhow::Result;
use log::{debug, error, trace};
use std::sync::{atomic::AtomicU32, Arc};
use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
use tokio::sync::{mpsc, Mutex, RwLock};

type Topic = super::super::ws_manager::Subscription;

Expand All @@ -26,7 +26,7 @@ impl Drop for SubToken {
impl Subs {
pub async fn connect(
base_url: &BaseUrl,
cancel_rx: oneshot::Receiver<()>,
mut cancel_rx: mpsc::Receiver<()>,
) -> Result<Arc<Self>> {
let (message_tx, message_rx) = mpsc::unbounded_channel();

Expand All @@ -44,7 +44,7 @@ impl Subs {
_ = subs.start(message_rx) => {
trace!("Subs has ended");
},
_ = cancel_rx => {
_ = cancel_rx.recv() => {
trace!("Received cancel command");

if let Err(e) = stream_cancel_tx.send(()) {
Expand Down

0 comments on commit 029ad46

Please sign in to comment.