Skip to content

Commit

Permalink
fix(tenderdash-abci): deadlock on multiple msgs received (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek authored Oct 5, 2023
1 parent 3280669 commit 2a98bf2
Showing 1 changed file with 103 additions and 8 deletions.
111 changes: 103 additions & 8 deletions abci/src/server/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ use bytes::{Buf, BufMut, BytesMut};
use futures::{SinkExt, StreamExt};
use prost::Message;
use proto::abci::{Request, Response};
use tokio::sync::{
mpsc::{self, Receiver, Sender},
Mutex,
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{
mpsc::{self, Receiver, Sender},
Mutex,
},
};
use tokio_util::{
codec::{Decoder, Encoder},
codec::{Decoder, Encoder, Framed},
net::Listener,
};

Expand Down Expand Up @@ -63,7 +66,7 @@ impl<'a> Codec {
async fn worker<L>(
listener: Arc<Mutex<L>>,
request_tx: Sender<proto::abci::Request>,
mut response_rx: Receiver<proto::abci::Response>,
response_rx: Receiver<proto::abci::Response>,
cancel: CancellationToken,
) where
L: Listener + Send + Sync,
Expand All @@ -87,13 +90,24 @@ impl<'a> Codec {
tracing::info!(?address, "accepted connection");

let stream = Box::pin(stream);
let mut codec = tokio_util::codec::Framed::new(stream, Coder {});
let codec = Framed::new(stream, Coder {});

Self::process_worker_queues(codec, request_tx, response_rx, cancel).await;
}
async fn process_worker_queues<L: AsyncRead + AsyncWrite + Unpin>(
mut codec: Framed<L, Coder>,
request_tx: Sender<proto::abci::Request>,
mut response_rx: Receiver<proto::abci::Response>,
cancel: CancellationToken,
) {
loop {
tokio::select! {
request = codec.next() => match request {
// Only read next message if we have capacity in request_tx to process it.
// Otherwise, we might block the codec worker on request_tx.send() and never
// process the next message from the response_rx stream.
request = codec.next(), if request_tx.capacity() > 0 => match request {
Some(Ok(i)) => {
if let Err(error) = request_tx.send(i).await {
if let Err(error) = request_tx.try_send(i) {
tracing::error!(?error, "unable to forward request for processing");
cancel.cancel();
}
Expand Down Expand Up @@ -189,3 +203,84 @@ impl Encoder<proto::abci::Response> for Coder {
Ok(())
}
}

#[cfg(test)]
mod test {
use prost::Message;
use tenderdash_proto::abci;
use tokio::{io::AsyncWriteExt, sync::mpsc};
use tokio_util::sync::CancellationToken;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
/// Test if a bug in the codec receiving 2 requests without a response in
/// between is fixed.
async fn test_codec_msg_msg_resp() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_ansi(true)
.try_init()
.ok();

let (request_tx, mut request_rx) = mpsc::channel::<abci::Request>(1);
let (response_tx, response_rx) = mpsc::channel::<abci::Response>(1);
let cancel = CancellationToken::new();

let (mut client, server) = tokio::io::duplex(10240);

let codec = tokio_util::codec::Framed::new(server, super::Coder {});

let worker_cancel = cancel.clone();
let hdl = tokio::spawn(
async move {
super::Codec::process_worker_queues(codec, request_tx, response_rx, worker_cancel)
}
.await,
);

// We send 2 requests over the wire
for n_requests in 0..5 {
let encoded = abci::Request {
value: Some(abci::request::Value::Echo(abci::RequestEcho {
message: format!("hello {}", n_requests),
})),
}
.encode_length_delimited_to_vec();

client.write_all(&encoded).await.unwrap();
}

// Now, wait till the codec has processed the requests
// The bug we fixed was that the codec would not process the second request
// until a response was sent.
// If the bug is still present, the test should report error here.
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;

// Then, we read one request
tracing::debug!("MAIN THREAD: reading request 1");
request_rx.recv().await.expect("dequeue request 1");
tracing::debug!("MAIN THREAD: dequeued request 1");

// Then, we send a response
tracing::debug!("MAIN THREAD: sending response 1");
response_tx
.send(abci::Response {
value: Some(abci::response::Value::Echo(abci::ResponseEcho {
message: "hello".to_string(),
})),
})
.await
.expect("enqueue response 1");
tracing::debug!("MAIN THREAD: enqueued response 1");

// Then, we read second request
tracing::debug!("MAIN THREAD: reading request 2");
request_rx.recv().await.expect("dequeue request 2");
tracing::debug!("MAIN THREAD: dequeued request 2");

// Success :)

// Cleanup
cancel.cancel();
hdl.await.unwrap();
}
}

0 comments on commit 2a98bf2

Please sign in to comment.