Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ws/robust, an alternative to ws_manager #49

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ thiserror = "1.0.44"
tokio = {version = "1.29.1", features = ["full"]}
tokio-tungstenite = {version = "0.20.0", features = ["native-tls"]}
uuid = {version = "1.6.1", features = ["v4"]}

# Dependencies added by ws/robust
anyhow = "1.0"
50 changes: 50 additions & 0 deletions src/bin/ws_robust_stream_candles.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use hyperliquid_rust_sdk::{robust::Stream, BaseUrl, Message, Subscription, SubscriptionSendData};
use std::time::Duration;
use tokio::{spawn, sync::mpsc, time::sleep};

/// Stream 1m ETH/USD candles directly without using any subscription helper
#[tokio::main]
async fn main() {
env_logger::init();

let (inbox_tx, mut inbox_rx) = mpsc::channel(100);

let (stream, handle) = Stream::connect(&BaseUrl::Mainnet, inbox_tx);

stream
.send(
serde_json::to_value(SubscriptionSendData {
method: "subscribe",
subscription: &serde_json::to_value(Subscription::Candle {
coin: "ETH".to_string(),
interval: "1m".to_string(),
})
.unwrap(),
})
.unwrap(),
)
.await
.unwrap();

println!("Streaming ETH/USD 1m candles for 60 seconds...");
println!("volume\topen\thigh\tlow\tclose");

spawn(async move {
sleep(Duration::from_secs(60)).await;

stream.cancel().await;
});

while let Some(message) = inbox_rx.recv().await {
if let Message::Candle(candle) = message {
let data = candle.data;
// vol, open, high, low, close
println!(
"{}\t{}\t{}\t{}\t{}",
data.volume, data.open, data.high, data.low, data.close
);
}
}

handle.await.unwrap().unwrap();
}
51 changes: 51 additions & 0 deletions src/bin/ws_robust_subs_trades.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use hyperliquid_rust_sdk::Message;
use hyperliquid_rust_sdk::{robust::Subs, BaseUrl, Subscription};
use tokio::{
sync::mpsc,
time::{sleep, Duration},
};

/// Stream trades for BTC/USD using the subscription helper
#[tokio::main]
async fn main() {
env_logger::init();

let (subs, handle) = Subs::start(&BaseUrl::Mainnet);

let (sub_tx, mut sub_rx) = mpsc::unbounded_channel();

tokio::select! {
join_result = handle => join_result.unwrap().unwrap(),
_ = async {
while let Some(message) = sub_rx.recv().await {
if let Message::Trades(trades) = message {
for trade in trades.data {
println!("{} {}", trade.side, trade.px);
}
}
}
} => {},
_ = async {
let _sub_token = subs
.add(
Subscription::Trades {
coin: "BTC".to_string(),
},
sub_tx,
)
.await
.unwrap();

println!("Streaming BTC/USD trades for 60 sec...");

sleep(Duration::from_secs(60)).await;

subs.cancel().await;
} => {}
};

// The sub token was dropped here, causing an unsubscribe
println!("Finished streaming. Unsubscribing and exiting in 1 sec...");

sleep(Duration::from_secs(1)).await;
}
8 changes: 7 additions & 1 deletion src/exchange/actions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::exchange::{cancel::CancelRequest, order::OrderRequest};
use crate::exchange::{cancel::CancelRequest, modify::ModifyRequest, order::OrderRequest};
pub(crate) use ethers::{
abi::{encode, ParamType, Tokenizable},
types::{
Expand Down Expand Up @@ -107,6 +107,12 @@ pub struct BulkCancel {
pub cancels: Vec<CancelRequest>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct BulkModify {
pub modifies: Vec<ModifyRequest>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct BulkCancelCloid {
Expand Down
43 changes: 41 additions & 2 deletions src/exchange/exchange_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use crate::signature::sign_typed_data;
use crate::{
exchange::{
actions::{
ApproveAgent, BulkCancel, BulkOrder, SetReferrer, UpdateIsolatedMargin, UpdateLeverage,
UsdSend,
ApproveAgent, BulkCancel, BulkModify, BulkOrder, SetReferrer, UpdateIsolatedMargin,
UpdateLeverage, UsdSend,
},
cancel::{CancelRequest, CancelRequestCloid},
modify::{ClientModifyRequest, ModifyRequest},
ClientCancelRequest, ClientOrderRequest,
},
helpers::{generate_random_key, next_nonce, uuid_to_hex_string},
Expand Down Expand Up @@ -58,6 +59,8 @@ pub enum Actions {
Order(BulkOrder),
Cancel(BulkCancel),
CancelByCloid(BulkCancelCloid),
#[serde(rename = "batchModify")]
Modify(BulkModify),
ApproveAgent(ApproveAgent),
Withdraw3(Withdraw3),
SpotUser(SpotUser),
Expand Down Expand Up @@ -427,6 +430,42 @@ impl ExchangeClient {
self.post(action, signature, timestamp).await
}

pub async fn modify(
&self,
modify: ClientModifyRequest,
wallet: Option<&LocalWallet>,
) -> Result<ExchangeResponseStatus> {
self.bulk_modify(vec![modify], wallet).await
}

pub async fn bulk_modify(
&self,
modifies: Vec<ClientModifyRequest>,
wallet: Option<&LocalWallet>,
) -> Result<ExchangeResponseStatus> {
let wallet = wallet.unwrap_or(&self.wallet);
let timestamp = next_nonce();

let mut transformed_modifies = Vec::new();
for modify in modifies.into_iter() {
transformed_modifies.push(ModifyRequest {
oid: modify.oid,
order: modify.order.convert(&self.coin_to_asset)?,
});
}

let action = Actions::Modify(BulkModify {
modifies: transformed_modifies,
});
let connection_id = action.hash(timestamp, self.vault_address)?;

let action = serde_json::to_value(&action).map_err(|e| Error::JsonParse(e.to_string()))?;
let is_mainnet = self.http_client.is_mainnet();
let signature = sign_l1_action(wallet, connection_id, is_mainnet)?;

self.post(action, signature, timestamp).await
}

pub async fn cancel_by_cloid(
&self,
cancel: ClientCancelRequestCloid,
Expand Down
2 changes: 2 additions & 0 deletions src/exchange/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ mod actions;
mod cancel;
mod exchange_client;
mod exchange_responses;
mod modify;
mod order;

pub use actions::*;
pub use cancel::{ClientCancelRequest, ClientCancelRequestCloid};
pub use exchange_client::*;
pub use exchange_responses::*;
pub use modify::{ClientModifyRequest, ModifyRequest};
pub use order::{
ClientLimit, ClientOrder, ClientOrderRequest, ClientTrigger, MarketCloseParams,
MarketOrderParams, Order,
Expand Down
13 changes: 13 additions & 0 deletions src/exchange/modify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use super::{order::OrderRequest, ClientOrderRequest};
use serde::{Deserialize, Serialize};

pub struct ClientModifyRequest {
pub oid: u64,
pub order: ClientOrderRequest,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ModifyRequest {
pub oid: u64,
pub order: OrderRequest,
}
2 changes: 1 addition & 1 deletion src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub enum BaseUrl {
}

impl BaseUrl {
pub(crate) fn get_url(&self) -> String {
pub fn get_url(&self) -> String {
match self {
BaseUrl::Localhost => LOCAL_API_URL.to_string(),
BaseUrl::Mainnet => MAINNET_API_URL.to_string(),
Expand Down
17 changes: 17 additions & 0 deletions src/info/info_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ pub enum InfoRequest {
user: H160,
oid: u64,
},
#[serde(rename = "orderStatus")]
OrderStatusByCloid {
user: H160,
oid: String,
},
Meta,
SpotMeta,
SpotMetaAndAssetCtxs,
Expand Down Expand Up @@ -259,6 +264,18 @@ impl InfoClient {
self.send_info_request(input).await
}

pub async fn query_order_by_cloid(
&self,
address: H160,
cloid: String,
) -> Result<OrderStatusResponse> {
let input = InfoRequest::OrderStatusByCloid {
user: address,
oid: cloid,
};
self.send_info_request(input).await
}

pub async fn query_referral_state(&self, address: H160) -> Result<ReferralResponse> {
let input = InfoRequest::Referral { user: address };
self.send_info_request(input).await
Expand Down
4 changes: 3 additions & 1 deletion src/info/response_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct UserFillsResponse {
pub start_position: String,
pub sz: String,
pub time: u64,
pub fee: String,
}

#[derive(serde::Deserialize, Debug)]
Expand Down Expand Up @@ -117,7 +118,8 @@ pub struct CandlesSnapshotResponse {
#[derive(Deserialize, Debug)]
pub struct OrderStatusResponse {
pub status: String,
pub order: OrderInfo,
/// `None` if the order is not found
pub order: Option<OrderInfo>,
}

#[derive(Deserialize, Debug)]
Expand Down
4 changes: 3 additions & 1 deletion src/ws/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod message_types;
pub mod robust;
mod sub_structs;
mod ws_manager;
pub use message_types::*;
pub use robust::*;
pub use sub_structs::*;
pub(crate) use ws_manager::WsManager;
pub use ws_manager::{Message, Subscription};
pub use ws_manager::{Message, Subscription, SubscriptionSendData};
5 changes: 5 additions & 0 deletions src/ws/robust/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod stream;
pub mod subs;

pub use stream::*;
pub use subs::*;
Loading