Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous Client: Cannot use await in callbacks #313

Closed
felix-gohla opened this issue Apr 28, 2023 · 0 comments · Fixed by #314
Closed

Asynchronous Client: Cannot use await in callbacks #313

felix-gohla opened this issue Apr 28, 2023 · 0 comments · Fixed by #314

Comments

@felix-gohla
Copy link
Contributor

felix-gohla commented Apr 28, 2023

Hi! Thanks for spending your time working on this library. It's great to see support for an asynchronous client.

Problem Description

Running the given example for the asynchronous client only works when not having an await statement within the callback. Code after the first await in the callback is not executed.
Thus, calling async functions is not possible.

When debugging, I found out that this is caused by the Stream implementation for the async client.

Steps To Reproduce

Run this Python server:

import socketio
import eventlet

sio = socketio.Server(async_mode='eventlet')
app = socketio.WSGIApp(sio)

@sio.event
def connect(sid, environ):
    print('Client connected:', sid)

@sio.event
def disconnect(sid):
    print('Client disconnected:', sid)

@sio.event
def foo(sid, data):
    print('foo event received:', data)

if __name__ == '__main__':
    eventlet.wsgi.server(eventlet.listen(('localhost', 4200)), app)

Connect with this Rust client and see that there is no output (note that "Connected!" will not be printed):

use futures_util::FutureExt;
use log::info;
use rust_socketio::{asynchronous::ClientBuilder, Event, TransportType};
use std::time::Duration;
use tokio::{
    signal::{
        self,
        unix::{signal, SignalKind},
    },
    time::sleep,
};

#[tokio::main]
async fn main() {
    env_logger::Builder::new().parse_default_env().init();

    info!("Starting");

    // get a socket that is connected to the admin namespace
    let socket = ClientBuilder::new("http://localhost:4200/")
        .transport_type(TransportType::Websocket)
        .on(Event::Connect, |_payload, _client| {
            {
                async move {
                    sleep(Duration::from_secs(1)).await;
                    info!("Connected!");
                }
                .boxed()
            }
        })
        .connect()
        .await
        .expect("Connection failed");

    let mut sigterm = signal(SignalKind::terminate()).expect("unable to create sigterm signal");
    tokio::select! {
        _ = signal::ctrl_c() => {
            info!("Received SIGINT, shutting down.");
        },
        _ = sigterm.recv() => {
            info!("Received SIGTERM, shutting down.");
        },
    }

    socket.disconnect().await.expect("Disconnect failed");
}

Here's the corresponding Cargo.toml:

[package]
name = "client"
version = "0.1.0"
edition = "2021"

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rust_socketio = { version = "0.4.1-alpha.1", features = ["async"] }
env_logger = { version = "0.10" }
log = "0.4"
tokio = { version = "1", features = ["macros", "rt", "signal", "time"] }
futures-util = "0.3"

It seems like the callback is aborted on the first await.

Possible Solution

Debugging around, I found that in the client's stream implementation, the ready! macro is used. This macro only returns the results of futures if they return immediately:

impl Stream for Client {
type Item = Result<Packet>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
loop {
// poll for the next payload
let next = ready!(self.socket.poll_next_unpin(cx));
match next {
None => {
// end the stream if the underlying one is closed
return Poll::Ready(None);
}
Some(Err(err)) => {
// call the error callback
ready!(Box::pin(self.callback(&Event::Error, err.to_string())).poll_unpin(cx))?;
return Poll::Ready(Some(Err(err)));
}
Some(Ok(packet)) => {
// if this packet is not meant for the current namespace, skip it an poll for the next one
if packet.nsp == self.nsp {
ready!(Box::pin(self.handle_socketio_packet(&packet)).poll_unpin(cx))?;
return Poll::Ready(Some(Ok(packet)));
}
}
}
}
}
}

One possible solution I found was to use futures_util::stream::unfold in a new function on the Client instead of implementing stream (something like that):

pub(crate) fn stream(&mut self) -> impl Stream<Item = Result<Packet>> + '_ {
    stream::unfold(self.socket.clone(), | mut socket | async {
          let packet: Option<std::result::Result<Packet, Error>> = socket.clone().next().await;
          match packet {
              None => None,
              Some(Err(err)) => {
                  let res = self.callback(&Event::Error, err.to_string()).await;
                  Some((Err(err), socket))
              },
              Some(Ok(packet)) => {
                  self.handle_socketio_packet(&packet).await;
                  Some((Ok(packet), socket))
              }
          }
      })
}

I created #314 as potential fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant