Skip to content

Commit

Permalink
[Based on #195 but zero-copy] Log ws message on parsing error to Mess…
Browse files Browse the repository at this point in the history
…age enum (#204)

* Log raw WS messages to facilitate the debugging

* Add debug-raw-ws-messages feature

* Emit an error on WsMessage::Text fails to be parsed

* Wake the task before returning Poll::Pending

---------

Co-authored-by: MOZGIII <[email protected]>
  • Loading branch information
ryo33 and MOZGIII authored Feb 14, 2024
1 parent 8d039e5 commit 3489675
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
33 changes: 23 additions & 10 deletions src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::task::ready;

use async_tungstenite::tungstenite::Message as WsMessage;
use async_tungstenite::{tungstenite::protocol::WebSocketConfig, WebSocketStream};
use futures::stream::Stream;
use futures::task::{Context, Poll};
Expand Down Expand Up @@ -132,21 +133,33 @@ impl<T: EventMessage + Unpin> Stream for Connection<T> {
pin.pending_flush = Some(call);
}
}

break;
}

// read from the ws
match ready!(pin.ws.poll_next_unpin(cx)) {
Some(Ok(msg)) => match serde_json::from_slice::<Message<T>>(&msg.into_data()) {
Ok(msg) => {
tracing::trace!("Received {:?}", msg);
Poll::Ready(Some(Ok(msg)))
}
Err(err) => {
tracing::error!("Failed to deserialize WS response {}", err);
Poll::Ready(Some(Err(err.into())))
}
},
Some(Ok(WsMessage::Text(text))) => {
let ready = match serde_json::from_str::<Message<T>>(&text) {
Ok(msg) => {
tracing::trace!("Received {:?}", msg);
Ok(msg)
}
Err(err) => {
tracing::debug!(target: "chromiumoxide::conn::raw_ws::parse_errors", msg = text, "Failed to parse raw WS message");
tracing::error!("Failed to deserialize WS response {}", err);
Err(err.into())
}
};
Poll::Ready(Some(ready))
}
Some(Ok(WsMessage::Close(_))) => Poll::Ready(None),
// ignore ping and pong
Some(Ok(WsMessage::Ping(_))) | Some(Ok(WsMessage::Pong(_))) => {
cx.waker().wake_by_ref();
Poll::Pending
}
Some(Ok(msg)) => Poll::Ready(Some(Err(CdpError::UnexpectedWsMessage(msg)))),
Some(Err(err)) => Poll::Ready(Some(Err(CdpError::Ws(err)))),
None => {
// ws connection closed
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::process::ExitStatus;
use std::time::Instant;

use async_tungstenite::tungstenite;
use async_tungstenite::tungstenite::Message;
use base64::DecodeError;
use futures::channel::mpsc::SendError;
use futures::channel::oneshot::Canceled;
Expand All @@ -28,6 +29,8 @@ pub enum CdpError {
Chrome(#[from] chromiumoxide_types::Error),
#[error("Received no response from the chromium instance.")]
NoResponse,
#[error("Received unexpected ws message: {0:?}")]
UnexpectedWsMessage(Message),
#[error("{0}")]
ChannelSendError(#[from] ChannelError),
#[error("Browser process exited with status {0:?} before websocket URL could be resolved, stderr: {1:?}")]
Expand Down

0 comments on commit 3489675

Please sign in to comment.