From c8f8e6dc345d003da90187d7bd9d6306b8aca0dc Mon Sep 17 00:00:00 2001 From: lklimek <842586+lklimek@users.noreply.github.com> Date: Thu, 28 Nov 2024 12:26:01 +0100 Subject: [PATCH] fix(abci): invalid error returned when codec terminates (#117) * chore: improve debugging * chore: trying to fix the test * fix(abci): invalid error returned when Codec terminates --- abci/src/server/codec.rs | 19 ++++++++++++++++--- abci/tests/kvstore.rs | 8 ++++++-- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/abci/src/server/codec.rs b/abci/src/server/codec.rs index 8996d71..dd38dc4 100644 --- a/abci/src/server/codec.rs +++ b/abci/src/server/codec.rs @@ -97,6 +97,18 @@ impl<'a> Codec { Self::process_worker_queues(codec, request_tx, response_rx, cancel).await; } + + /// Worker that moves messages between codec and requests and responses + /// queues. + /// + /// Reads messages from ABCI Client from `codec`, sends them to Tenderdash + /// via `request_tx`, receives Tenderdash responses from `response_rx` + /// and forwards to + /// + /// ## Error handling + /// + /// On error, it cancels the `cancel` [CancellationToken] and exits. + /// It causes `response_rx` to be closed. async fn process_worker_queues( mut codec: Framed, request_tx: Sender, @@ -149,9 +161,10 @@ impl<'a> Codec { } pub fn send(&self, value: Response) -> Result<(), Error> { - self.response_tx - .blocking_send(value) - .map_err(|e| Error::Async(e.to_string())) + self.response_tx.blocking_send(value).map_err(|_| { + // channel closed, `process_worker_queues` either errored or cancelled + Error::Cancelled() + }) } } diff --git a/abci/tests/kvstore.rs b/abci/tests/kvstore.rs index 92f04bc..e3cc394 100644 --- a/abci/tests/kvstore.rs +++ b/abci/tests/kvstore.rs @@ -63,10 +63,12 @@ fn test_kvstore() { fs::set_permissions(SOCKET, perms).expect("set perms"); let socket_uri = bind_address.to_string(); - let _td = common::docker::TenderdashDocker::new("tenderdash", None, &socket_uri); + let td = common::docker::TenderdashDocker::new("tenderdash", None, &socket_uri); + let next_client = server.next_client(); + tracing::debug!(?next_client, "next client"); assert!(matches!( - server.next_client(), + next_client, Err(tenderdash_abci::Error::Cancelled()) )); drop(server); @@ -74,6 +76,8 @@ fn test_kvstore() { let kvstore_app = kvstore.into_inner().expect("kvstore lock is poisoned"); assert_eq!(kvstore_app.persisted_state, state_reference); assert_eq!(kvstore_app.last_block_height, 1); + + drop(td); } /// An example storage.