From 866094001d4c0b119a80ed681a74b323f74eae1b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 24 Nov 2023 14:41:21 +0200 Subject: [PATCH] lightclient: Separate wakes and check connectivity on poll_read Signed-off-by: Alexandru Vasile --- lightclient/src/platform/wasm_socket.rs | 53 ++++++++++++++++++------- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/lightclient/src/platform/wasm_socket.rs b/lightclient/src/platform/wasm_socket.rs index 81ac605fc0..35eec32962 100644 --- a/lightclient/src/platform/wasm_socket.rs +++ b/lightclient/src/platform/wasm_socket.rs @@ -55,8 +55,15 @@ struct InnerWasmSocket { state: ConnectionState, /// Data buffer for the socket. data: VecDeque, - /// Waker from `poll_read` / `poll_write`. - waker: Option, + + /// Waker from `poll_read` when the socket is not connected yet. + open_waker: Option, + /// Waker from `poll_read`. + read_waker: Option, + /// Waker from `poll_write` and `poll_flush`. + write_waker: Option, + /// Waker from `poll_close`. + close_waker: Option, } /// Registered callbacks of the [`WasmSocket`]. @@ -86,7 +93,10 @@ impl WasmSocket { let inner = Arc::new(Mutex::new(InnerWasmSocket { state: ConnectionState::Connecting, data: VecDeque::with_capacity(16384), - waker: None, + open_waker: None, + read_waker: None, + write_waker: None, + close_waker: None, })); let open_callback = Closure::once_into_js({ @@ -95,7 +105,7 @@ impl WasmSocket { let mut inner = inner.lock().expect("Mutex is poised; qed"); inner.state = ConnectionState::Opened; - if let Some(waker) = inner.waker.take() { + if let Some(waker) = inner.open_waker.take() { waker.wake(); } } @@ -113,7 +123,7 @@ impl WasmSocket { let bytes = js_sys::Uint8Array::new(&buffer).to_vec(); inner.data.extend(bytes.into_iter()); - if let Some(waker) = inner.waker.take() { + if let Some(waker) = inner.read_waker.take() { waker.wake(); } } @@ -126,10 +136,6 @@ impl WasmSocket { // Callback does not provide useful information, signal it back to the stream. let mut inner = inner.lock().expect("Mutex is poised; qed"); inner.state = ConnectionState::Error; - - if let Some(waker) = inner.waker.take() { - waker.wake(); - } } }); socket.set_onerror(Some(error_callback.as_ref().unchecked_ref())); @@ -140,7 +146,7 @@ impl WasmSocket { let mut inner = inner.lock().expect("Mutex is poised; qed"); inner.state = ConnectionState::Closed; - if let Some(waker) = inner.waker.take() { + if let Some(waker) = inner.close_waker.take() { waker.wake(); } } @@ -169,8 +175,17 @@ impl AsyncRead for WasmSocket { buf: &mut [u8], ) -> Poll> { let mut inner = self.inner.lock().expect("Mutex is poised; qed"); - inner.waker = Some(cx.waker().clone()); + // Check if the socket is ready for reading. + let state = self.socket.ready_state(); + if state == web_sys::WebSocket::CLOSED || state == web_sys::WebSocket::CLOSING { + return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())); + } else if state == web_sys::WebSocket::CONNECTING { + inner.open_waker = Some(cx.waker().clone()); + return Poll::Pending; + } + + inner.read_waker = Some(cx.waker().clone()); match inner.state { ConnectionState::Error => { Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Socket error"))) @@ -199,7 +214,7 @@ impl AsyncWrite for WasmSocket { buf: &[u8], ) -> Poll> { let mut inner = self.inner.lock().expect("Mutex is poised; qed"); - inner.waker = Some(cx.waker().clone()); + inner.write_waker = Some(cx.waker().clone()); match inner.state { ConnectionState::Error => { @@ -221,8 +236,18 @@ impl AsyncWrite for WasmSocket { Poll::Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.socket.ready_state() == web_sys::WebSocket::CLOSED { + return Poll::Ready(Ok(())); + } + + if self.socket.ready_state() != web_sys::WebSocket::CLOSING { + let _ = self.socket.close(); + } + + let mut inner = self.inner.lock().expect("Mutex is poised; qed"); + inner.close_waker = Some(cx.waker().clone()); + Poll::Pending } }