diff --git a/src/bin/ws_robust_stream_candles.rs b/src/bin/ws_robust_stream_candles.rs index 3e96d83..26c8839 100644 --- a/src/bin/ws_robust_stream_candles.rs +++ b/src/bin/ws_robust_stream_candles.rs @@ -1,4 +1,4 @@ -use hyperliquid_rust_sdk::{robust::Stream, BaseUrl, Subscription, SubscriptionSendData}; +use hyperliquid_rust_sdk::{robust::Stream, BaseUrl, Message, Subscription, SubscriptionSendData}; use std::time::Duration; use tokio::{spawn, sync::mpsc, time::sleep}; @@ -27,24 +27,24 @@ async fn main() { .unwrap(); println!("Streaming ETH/USD 1m candles for 60 seconds..."); + println!("volume\topen\thigh\tlow\tclose"); spawn(async move { sleep(Duration::from_secs(60)).await; - println!("Cancelling stream..."); - stream.cancel().await; - - println!("Cancelled stream"); }); while let Some(message) = inbox_rx.recv().await { - println!("Received message: {:?}", message); + if let Message::Candle(candle) = message { + let data = candle.data; + // vol, open, high, low, close + println!( + "{}\t{}\t{}\t{}\t{}", + data.volume, data.open, data.high, data.low, data.close + ); + } } - println!("Message loop over"); - handle.await.unwrap().unwrap(); - - println!("Unwrapped. Done"); } diff --git a/src/bin/ws_robust_subs_trades.rs b/src/bin/ws_robust_subs_trades.rs index 3ccdfa7..96b4823 100644 --- a/src/bin/ws_robust_subs_trades.rs +++ b/src/bin/ws_robust_subs_trades.rs @@ -1,6 +1,9 @@ +use hyperliquid_rust_sdk::Message; use hyperliquid_rust_sdk::{robust::Subs, BaseUrl, Subscription}; -use tokio::sync::mpsc; -use tokio::time::{sleep, Duration}; +use tokio::{ + sync::mpsc, + time::{sleep, Duration}, +}; /// Stream trades for BTC/USD using the subscription helper #[tokio::main] @@ -12,18 +15,16 @@ async fn main() { let (sub_tx, mut sub_rx) = mpsc::unbounded_channel(); tokio::select! { - result = handle => { - println!("Result branch won the race"); - - result.unwrap().unwrap(); - }, + join_result = handle => join_result.unwrap().unwrap(), _ = async { while let Some(message) = sub_rx.recv().await { - println!("Received message: {:?}", message); + if let Message::Trades(trades) = message { + for trade in trades.data { + println!("{} {}", trade.side, trade.px); + } + } } - } => { - println!("Sub token branch won the race"); - }, + } => {}, _ = async { let _sub_token = subs .add( @@ -35,18 +36,16 @@ async fn main() { .await .unwrap(); - println!("Streaming BTC/USD trades for 120 sec..."); + println!("Streaming BTC/USD trades for 60 sec..."); - sleep(Duration::from_secs(120)).await; + sleep(Duration::from_secs(60)).await; subs.cancel().await; - } => { - println!("Sub token branch won the race"); - } - } + } => {} + }; // The sub token was dropped here, causing an unsubscribe - println!("Finished streaming. Unsubscribing and exiting in 5 sec..."); + println!("Finished streaming. Unsubscribing and exiting in 1 sec..."); - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(1)).await; }