Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous Client: "Error while parsing an incomplete packet socketio" on first heartbeat killing the connection #311

Closed
sirkrypt0 opened this issue Apr 26, 2023 · 2 comments · Fixed by #312

Comments

@sirkrypt0
Copy link
Contributor

sirkrypt0 commented Apr 26, 2023

Hi! Thanks for spending your time working on this library. It's great to see support for an asynchronous client.

Problem Description

Running the asynchronous client a bit longer (25 seconds) seems to crash it with the error "Error while parsing an incomplete packet socketio".
This also results in one CPU core being 100% utilized, suggesting that there is some busy waiting or similar.

While debugging this, I found that this appears to be the heartbeat which causes the crash.

Steps To Reproduce

Run this Python server:

import socketio
import eventlet

sio = socketio.Server(async_mode='eventlet')
app = socketio.WSGIApp(sio)

@sio.event
def connect(sid, environ):
    print('Client connected:', sid)

@sio.event
def disconnect(sid):
    print('Client disconnected:', sid)

@sio.event
def foo(sid, data):
    print('foo event received:', data)

if __name__ == '__main__':
    eventlet.wsgi.server(eventlet.listen(('localhost', 4200)), app)

Connect with this Rust client:

use futures_util::FutureExt;
use log::{error, info};
use rust_socketio::{
    asynchronous::{Client, ClientBuilder},
    Event, Payload, TransportType,
};
use serde_json::json;
use std::time::Duration;
use tokio::{
    signal::{
        self,
        unix::{signal, SignalKind},
    },
    time::sleep,
};

#[tokio::main]
async fn main() {
    env_logger::Builder::new().parse_default_env().init();

    info!("Starting");

    // define a callback which is called when a payload is received
    // this callback gets the payload as well as an instance of the
    // socket to communicate with the server
    let callback = |payload: Payload, socket: Client| {
        async move {
            match payload {
                Payload::String(str) => println!("Received: {}", str),
                Payload::Binary(bin_data) => println!("Received bytes: {:#?}", bin_data),
            }
            socket
                .emit("test", json!({"got ack": true}))
                .await
                .expect("Server unreachable");
        }
        .boxed()
    };

    // get a socket that is connected to the admin namespace
    let socket = ClientBuilder::new("http://localhost:4200/")
        .transport_type(TransportType::Websocket)
        .on(Event::Connect, |_payload, _client| {
            {
                async move {
                    info!("Connected!");
                }
                .boxed()
            }
        })
        .on(Event::Close, |_payload, _client| {
            {
                async move {
                    info!("Connection closed!");
                }
                .boxed()
            }
        })
        .on(Event::Error, |err, _| {
            async move { error!("Error: {:#?}", err) }.boxed()
        })
        .on("test", callback)
        .connect()
        .await
        .expect("Connection failed");

    // emit to the "foo" event
    let json_payload = json!({"token": 123});
    socket
        .emit("foo", json_payload)
        .await
        .expect("Server unreachable");

    info!("Sent foo event");

    info!("Waiting 30 seconds");
    sleep(Duration::from_secs(30)).await;

    let json_payload = json!({"token": 123});
    socket
        .emit("foo", json_payload)
        .await
        .expect("Server unreachable");

    info!("Sent foo event");

    let mut sigterm = signal(SignalKind::terminate()).expect("unable to create sigterm signal");
    tokio::select! {
        _ = signal::ctrl_c() => {
            info!("Received SIGINT, shutting down.");
        },
        _ = sigterm.recv() => {
            info!("Received SIGTERM, shutting down.");
        },
    }

    socket.disconnect().await.expect("Disconnect failed");
}

Here's the corresponding Cargo.toml:

[package]
name = "client"
version = "0.1.0"
edition = "2021"

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rust_socketio = { version = "0.4.1-alpha.1", features = ["async"] }
env_logger = { version = "0.10" }
log = "0.4"
tokio = { version = "1", features = ["macros", "rt", "signal", "time"] }
futures-util = "0.3"

Notice that after 25 seconds the error is printed. While the event can still be sent out, waiting for some more time will disconnect the client as shown in the output of the Python server.
Moreover, notice how the CPU load increases to 100% for one core.

Possible Solution

Digging through the code, I found that the asynchronous implementation of the packet handling in Socket::stream:

fn stream(
client: EngineClient,
is_connected: Arc<AtomicBool>,
) -> Pin<Box<impl Stream<Item = Result<Packet>> + Send>> {
Box::pin(try_stream! {
for await received_data in client.clone() {
let packet = received_data?;
let packet = Self::handle_engineio_packet(packet, client.clone()).await?;
Self::handle_socketio_packet(&packet, is_connected.clone());
yield packet;
}
})
}

is missing a check for packet.packet_id == EnginePacketId::Message || packet.packet_id == EnginePacketId::MessageBinary similar to the one in the synchronous version:

pub(crate) fn poll(&self) -> Result<Option<Packet>> {
loop {
match self.engine_client.poll() {
Ok(Some(packet)) => {
if packet.packet_id == EnginePacketId::Message
|| packet.packet_id == EnginePacketId::MessageBinary
{
let packet = self.handle_engineio_packet(packet)?;
self.handle_socketio_packet(&packet);
return Ok(Some(packet));
} else {
continue;
}
}
Ok(None) => {
return Ok(None);
}
Err(err) => return Err(err.into()),
}
}
}

Adding that check appears to be working as expected.

I created #312 which solves that.

@1c3t3a
Copy link
Owner

1c3t3a commented Apr 27, 2023

Hi @sirkrypt0, thanks a lot for catching the bug and explaining the fix so thoroughly! Very much appreciated! :) I will review the PR shortly.

@1c3t3a
Copy link
Owner

1c3t3a commented Apr 27, 2023

Let me know if you need a release of this soon, I probably do one over the weekend otherwise!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants