Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error processing data received by WS manager reader: Reader data not found #27

Open
jackbbhua opened this issue May 26, 2024 · 5 comments

Comments

@jackbbhua
Copy link

jackbbhua commented May 26, 2024

[2024-05-26T12:59:06Z ERROR hyperliquid_rust_sdk::ws::ws_manager] Error processing data received by WS manager reader: Reader data not found

Hi, i am run the cargo run --bin ws_trades but it shows this error after some minutes.
please check.thanks.

@Frixoe
Copy link
Contributor

Frixoe commented Jul 19, 2024

Hey @jackbbhua, this usually happens due to an issue with your connection or the ws connection breaking/expiring. This does not look like an issue related to the rust sdk.

Whenever you get that error, you should try to unsubscribe and subscribe to the ws type again.

@abrkn
Copy link
Contributor

abrkn commented Aug 12, 2024

Hey @jackbbhua, this usually happens due to an issue with your connection or the ws connection breaking/expiring. This does not look like an issue related to the rust sdk.

Whenever you get that error, you should try to unsubscribe and subscribe to the ws type again.

I'm having the same issue. Even with a stable connection from aws ap-northeast-1.

ERROR hyperliquid_rust_sdk::ws::ws_manager] Error processing data received by WS manager reader: Reader data not found

The error in question is

#[error("Reader data not found")]
ReaderDataNotFound,

...which is returned here

async fn parse_and_send_data(
data: Option<std::result::Result<protocol::Message, tungstenite::Error>>,
subscriptions: &Arc<Mutex<HashMap<String, Vec<SubscriptionData>>>>,
) -> Result<()> {
let data = data
.ok_or(Error::ReaderDataNotFound)?

...which is called here

let reader_fut = async move {
// TODO: reconnect
loop {
let data = reader.next().await;
if let Err(err) = WsManager::parse_and_send_data(data, &subscriptions_copy).await {
error!("Error processing data received by WS manager reader: {err}");
}
}

As you can see there is a TODO and no error handling. The error is printed and ignored. I don't see how the consumer can catch this error and handle.

I believe the ux could be improved by at least removing Option<> from parse_and_send_data's data argument and notifying subscribers of the error in the calling function.

@abrkn
Copy link
Contributor

abrkn commented Aug 12, 2024

Here's my workaround to re-establish the connection if no 1m candle is received in 30 sec:

/// Continuously store the last price of BTC in `last_price`
async fn store_last_price(last_price: Arc<Mutex<f64>>) {
    loop {
        trace!("Connecting to info stream..");

        let mut info_client = InfoClient::new(None, Some(BaseUrl::Mainnet)).await.unwrap();

        // from https://github.com/hyperliquid-dex/hyperliquid-rust-sdk/blob/master/src/bin/ws_candles.rs
        let (sender, mut receiver) = unbounded_channel();

        let subscription_id = info_client
            .subscribe(
                Subscription::Candle {
                    coin: "BTC".to_string(),
                    interval: "1m".to_string(),
                },
                sender,
            )
            .await
            .unwrap();

        while let Ok(result) = timeout(Duration::from_secs(30), receiver.recv()).await {
            if let Message::Candle(candle) = result.unwrap() {
                trace!("Received candle: {:?}", candle);

                *last_price.lock().unwrap() = candle.data.close.parse::<f64>().unwrap();
            }
        }

        if let Err(e) = info_client.unsubscribe(subscription_id).await {
            error!("Failed to unsubscribe: {}", e);
        }

        warn!("No candle received in 30 seconds, retrying...");
    }
}

@Frixoe
Copy link
Contributor

Frixoe commented Aug 15, 2024

@abrkn

Yeah but this still wouldn't work long term for a connection that needs to remain open 24/7. I've come up with a proper work around and I'm opening a PR soon. Unsubbing and resubbing doesn't work because the reader and pinging threads are still using the old channel and they're never closed. So your solution would at some point end up doing two things:

  • Blow up your CPU and memory(not as much) usage because of the new threads you've just spawned
  • Blow up your storage because of all the logs from the orphan threads

For your solution, use the same channel. That would work.

The best way to reconnect without any issues if to drop the InfoClient and run subscribe again. But keep in mind the difference in senders and receivers of your new channel. If any resource is using the receiver, you would need to reuse the old sender channel so best to keep it handy somewhere.

@abrkn
Copy link
Contributor

abrkn commented Aug 16, 2024

The stream implementation is too broken for workarounds. I have made a fresh streaming client with optional sub management: #49

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants