Skip to content

Commit

Permalink
fixup! Add ws/robust, an alternative to ws_manager
Browse files Browse the repository at this point in the history
clean up bin scripts
  • Loading branch information
abrkn committed Aug 23, 2024
1 parent 82891e4 commit f6cf317
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 29 deletions.
20 changes: 10 additions & 10 deletions src/bin/ws_robust_stream_candles.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use hyperliquid_rust_sdk::{robust::Stream, BaseUrl, Subscription, SubscriptionSendData};
use hyperliquid_rust_sdk::{robust::Stream, BaseUrl, Message, Subscription, SubscriptionSendData};
use std::time::Duration;
use tokio::{spawn, sync::mpsc, time::sleep};

Expand Down Expand Up @@ -27,24 +27,24 @@ async fn main() {
.unwrap();

println!("Streaming ETH/USD 1m candles for 60 seconds...");
println!("volume\topen\thigh\tlow\tclose");

spawn(async move {
sleep(Duration::from_secs(60)).await;

println!("Cancelling stream...");

stream.cancel().await;

println!("Cancelled stream");
});

while let Some(message) = inbox_rx.recv().await {
println!("Received message: {:?}", message);
if let Message::Candle(candle) = message {
let data = candle.data;
// vol, open, high, low, close
println!(
"{}\t{}\t{}\t{}\t{}",
data.volume, data.open, data.high, data.low, data.close
);
}
}

println!("Message loop over");

handle.await.unwrap().unwrap();

println!("Unwrapped. Done");
}
37 changes: 18 additions & 19 deletions src/bin/ws_robust_subs_trades.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use hyperliquid_rust_sdk::Message;
use hyperliquid_rust_sdk::{robust::Subs, BaseUrl, Subscription};
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
use tokio::{
sync::mpsc,
time::{sleep, Duration},
};

/// Stream trades for BTC/USD using the subscription helper
#[tokio::main]
Expand All @@ -12,18 +15,16 @@ async fn main() {
let (sub_tx, mut sub_rx) = mpsc::unbounded_channel();

tokio::select! {
result = handle => {
println!("Result branch won the race");

result.unwrap().unwrap();
},
join_result = handle => join_result.unwrap().unwrap(),
_ = async {
while let Some(message) = sub_rx.recv().await {
println!("Received message: {:?}", message);
if let Message::Trades(trades) = message {
for trade in trades.data {
println!("{} {}", trade.side, trade.px);
}
}
}
} => {
println!("Sub token branch won the race");
},
} => {},
_ = async {
let _sub_token = subs
.add(
Expand All @@ -35,18 +36,16 @@ async fn main() {
.await
.unwrap();

println!("Streaming BTC/USD trades for 120 sec...");
println!("Streaming BTC/USD trades for 60 sec...");

sleep(Duration::from_secs(120)).await;
sleep(Duration::from_secs(60)).await;

subs.cancel().await;
} => {
println!("Sub token branch won the race");
}
}
} => {}
};

// The sub token was dropped here, causing an unsubscribe
println!("Finished streaming. Unsubscribing and exiting in 5 sec...");
println!("Finished streaming. Unsubscribing and exiting in 1 sec...");

sleep(Duration::from_secs(5)).await;
sleep(Duration::from_secs(1)).await;
}

0 comments on commit f6cf317

Please sign in to comment.