From 2a98bf2b4425c81c3319fae7a2525c4cab5b53fa Mon Sep 17 00:00:00 2001 From: lklimek <842586+lklimek@users.noreply.github.com> Date: Thu, 5 Oct 2023 21:30:53 +0200 Subject: [PATCH] fix(tenderdash-abci): deadlock on multiple msgs received (#41) --- abci/src/server/codec.rs | 111 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 103 insertions(+), 8 deletions(-) diff --git a/abci/src/server/codec.rs b/abci/src/server/codec.rs index b96d6cc..1da927a 100644 --- a/abci/src/server/codec.rs +++ b/abci/src/server/codec.rs @@ -10,12 +10,15 @@ use bytes::{Buf, BufMut, BytesMut}; use futures::{SinkExt, StreamExt}; use prost::Message; use proto::abci::{Request, Response}; -use tokio::sync::{ - mpsc::{self, Receiver, Sender}, - Mutex, +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::{ + mpsc::{self, Receiver, Sender}, + Mutex, + }, }; use tokio_util::{ - codec::{Decoder, Encoder}, + codec::{Decoder, Encoder, Framed}, net::Listener, }; @@ -63,7 +66,7 @@ impl<'a> Codec { async fn worker( listener: Arc>, request_tx: Sender, - mut response_rx: Receiver, + response_rx: Receiver, cancel: CancellationToken, ) where L: Listener + Send + Sync, @@ -87,13 +90,24 @@ impl<'a> Codec { tracing::info!(?address, "accepted connection"); let stream = Box::pin(stream); - let mut codec = tokio_util::codec::Framed::new(stream, Coder {}); + let codec = Framed::new(stream, Coder {}); + Self::process_worker_queues(codec, request_tx, response_rx, cancel).await; + } + async fn process_worker_queues( + mut codec: Framed, + request_tx: Sender, + mut response_rx: Receiver, + cancel: CancellationToken, + ) { loop { tokio::select! { - request = codec.next() => match request { + // Only read next message if we have capacity in request_tx to process it. + // Otherwise, we might block the codec worker on request_tx.send() and never + // process the next message from the response_rx stream. + request = codec.next(), if request_tx.capacity() > 0 => match request { Some(Ok(i)) => { - if let Err(error) = request_tx.send(i).await { + if let Err(error) = request_tx.try_send(i) { tracing::error!(?error, "unable to forward request for processing"); cancel.cancel(); } @@ -189,3 +203,84 @@ impl Encoder for Coder { Ok(()) } } + +#[cfg(test)] +mod test { + use prost::Message; + use tenderdash_proto::abci; + use tokio::{io::AsyncWriteExt, sync::mpsc}; + use tokio_util::sync::CancellationToken; + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + /// Test if a bug in the codec receiving 2 requests without a response in + /// between is fixed. + async fn test_codec_msg_msg_resp() { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_ansi(true) + .try_init() + .ok(); + + let (request_tx, mut request_rx) = mpsc::channel::(1); + let (response_tx, response_rx) = mpsc::channel::(1); + let cancel = CancellationToken::new(); + + let (mut client, server) = tokio::io::duplex(10240); + + let codec = tokio_util::codec::Framed::new(server, super::Coder {}); + + let worker_cancel = cancel.clone(); + let hdl = tokio::spawn( + async move { + super::Codec::process_worker_queues(codec, request_tx, response_rx, worker_cancel) + } + .await, + ); + + // We send 2 requests over the wire + for n_requests in 0..5 { + let encoded = abci::Request { + value: Some(abci::request::Value::Echo(abci::RequestEcho { + message: format!("hello {}", n_requests), + })), + } + .encode_length_delimited_to_vec(); + + client.write_all(&encoded).await.unwrap(); + } + + // Now, wait till the codec has processed the requests + // The bug we fixed was that the codec would not process the second request + // until a response was sent. + // If the bug is still present, the test should report error here. + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + + // Then, we read one request + tracing::debug!("MAIN THREAD: reading request 1"); + request_rx.recv().await.expect("dequeue request 1"); + tracing::debug!("MAIN THREAD: dequeued request 1"); + + // Then, we send a response + tracing::debug!("MAIN THREAD: sending response 1"); + response_tx + .send(abci::Response { + value: Some(abci::response::Value::Echo(abci::ResponseEcho { + message: "hello".to_string(), + })), + }) + .await + .expect("enqueue response 1"); + tracing::debug!("MAIN THREAD: enqueued response 1"); + + // Then, we read second request + tracing::debug!("MAIN THREAD: reading request 2"); + request_rx.recv().await.expect("dequeue request 2"); + tracing::debug!("MAIN THREAD: dequeued request 2"); + + // Success :) + + // Cleanup + cancel.cancel(); + hdl.await.unwrap(); + } +}