From 6b95d271d834d0ffe0bcc44bbed3f0d32fee0ed8 Mon Sep 17 00:00:00 2001 From: Arya Date: Fri, 30 Aug 2024 16:09:10 -0400 Subject: [PATCH] fix(rpc): Return verification errors from `sendrawtransaction` RPC method (#8788) * Adds a mempool request to wait for a transaction verification result and uses it in `sendrawtransaction` RPC method * removes unnecessary clone * fix clippy warnings * returns verification errors for all `mempool::Queue` requests, removes `QueueRpc` request variant * returns oneshot channel in mempool::Response::Queue * updates a test vector to check for download or verification error in mempool::response::Queued result receiver * Always require tokio as a dependency in zebra-node-services * checks for closed channel errors in sendrawtransaction and updates a prop test to check that verification errors are propagated correctly --- zebra-node-services/Cargo.toml | 4 +- zebra-node-services/src/mempool.rs | 9 ++-- zebra-rpc/src/methods.rs | 15 +++++-- zebra-rpc/src/methods/tests/prop.rs | 43 ++++++++++++++++--- zebra-rpc/src/queue/tests/prop.rs | 10 +++-- zebrad/src/components/mempool.rs | 36 ++++++++++------ .../components/mempool/crawler/tests/prop.rs | 14 ++++-- zebrad/src/components/mempool/downloads.rs | 31 +++++++++---- zebrad/src/components/mempool/tests/vector.rs | 17 +++++++- 9 files changed, 133 insertions(+), 46 deletions(-) diff --git a/zebra-node-services/Cargo.toml b/zebra-node-services/Cargo.toml index 5c188b34b40..8d0992dcf5a 100644 --- a/zebra-node-services/Cargo.toml +++ b/zebra-node-services/Cargo.toml @@ -34,7 +34,7 @@ rpc-client = [ "serde_json", ] -shielded-scan = ["tokio"] +shielded-scan = [] [dependencies] zebra-chain = { path = "../zebra-chain" , version = "1.0.0-beta.39" } @@ -48,7 +48,7 @@ jsonrpc-core = { version = "18.0.0", optional = true } reqwest = { version = "0.11.26", default-features = false, features = ["rustls-tls"], optional = true } serde = { version = "1.0.204", optional = true } serde_json = { version = "1.0.122", optional = true } -tokio = { version = "1.39.2", features = ["time"], optional = true } +tokio = { version = "1.39.2", features = ["time", "sync"] } [dev-dependencies] diff --git a/zebra-node-services/src/mempool.rs b/zebra-node-services/src/mempool.rs index 98c1969bbad..fbaaf029c75 100644 --- a/zebra-node-services/src/mempool.rs +++ b/zebra-node-services/src/mempool.rs @@ -4,6 +4,7 @@ use std::collections::HashSet; +use tokio::sync::oneshot; use zebra_chain::transaction::{self, UnminedTx, UnminedTxId}; #[cfg(feature = "getblocktemplate-rpcs")] @@ -114,13 +115,11 @@ pub enum Response { /// Returns matching cached rejected [`UnminedTxId`]s from the mempool, RejectedTransactionIds(HashSet), - /// Returns a list of queue results. - /// - /// These are the results of the initial queue checks. - /// The transaction may also fail download or verification later. + /// Returns a list of initial queue checks results and a oneshot receiver + /// for awaiting download and/or verification results. /// /// Each result matches the request at the corresponding vector index. - Queued(Vec>), + Queued(Vec>, BoxError>>), /// Confirms that the mempool has checked for recently verified transactions. CheckedForVerifiedTransactions, diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index ae5deb7a5b9..471d542922c 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -664,7 +664,7 @@ where let response = mempool.oneshot(request).await.map_server_error()?; - let queue_results = match response { + let mut queue_results = match response { mempool::Response::Queued(results) => results, _ => unreachable!("incorrect response variant from mempool service"), }; @@ -675,10 +675,17 @@ where "mempool service returned more results than expected" ); - tracing::debug!("sent transaction to mempool: {:?}", &queue_results[0]); + let queue_result = queue_results + .pop() + .expect("there should be exactly one item in Vec") + .inspect_err(|err| tracing::debug!("sent transaction to mempool: {:?}", &err)) + .map_server_error()? + .await; + + tracing::debug!("sent transaction to mempool: {:?}", &queue_result); - queue_results[0] - .as_ref() + queue_result + .map_server_error()? .map(|_| SentTransactionHash(transaction_hash)) .map_server_error() } diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index c2a9c70a348..409a6aefe52 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -7,6 +7,7 @@ use hex::ToHex; use jsonrpc_core::{Error, ErrorCode}; use proptest::{collection::vec, prelude::*}; use thiserror::Error; +use tokio::sync::oneshot; use tower::buffer::Buffer; use zebra_chain::{ @@ -61,7 +62,9 @@ proptest! { let unmined_transaction = UnminedTx::from(transaction); let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]); - let response = mempool::Response::Queued(vec![Ok(())]); + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Ok(())); + let response = mempool::Response::Queued(vec![Ok(rsp_rx)]); mempool .expect_request(expected_request) @@ -111,10 +114,10 @@ proptest! { .expect("Transaction serializes successfully"); let transaction_hex = hex::encode(&transaction_bytes); - let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); + let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex.clone())); let unmined_transaction = UnminedTx::from(transaction); - let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]); + let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]); mempool .expect_request(expected_request) @@ -138,6 +141,32 @@ proptest! { "Result is not a server error: {result:?}" ); + let send_task = tokio::spawn(rpc.send_raw_transaction(transaction_hex)); + + let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]); + + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Err("any verification error".into())); + mempool + .expect_request(expected_request) + .await? + .respond(Ok::<_, BoxError>(mempool::Response::Queued(vec![Ok(rsp_rx)]))); + + let result = send_task + .await + .expect("Sending raw transactions should not panic"); + + prop_assert!( + matches!( + result, + Err(Error { + code: ErrorCode::ServerError(_), + .. + }) + ), + "Result is not a server error: {result:?}" + ); + // The queue task should continue without errors or panics let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never(); prop_assert!(rpc_tx_queue_task_result.is_none()); @@ -897,7 +926,9 @@ proptest! { // now a retry will be sent to the mempool let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(tx_unmined.clone())]); - let response = mempool::Response::Queued(vec![Ok(())]); + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Ok(())); + let response = mempool::Response::Queued(vec![Ok(rsp_rx)]); mempool .expect_request(expected_request) @@ -997,7 +1028,9 @@ proptest! { for tx in txs.clone() { let expected_request = mempool::Request::Queue(vec![mempool::Gossip::Tx(UnminedTx::from(tx))]); - let response = mempool::Response::Queued(vec![Ok(())]); + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Ok(())); + let response = mempool::Response::Queued(vec![Ok(rsp_rx)]); mempool .expect_request(expected_request) diff --git a/zebra-rpc/src/queue/tests/prop.rs b/zebra-rpc/src/queue/tests/prop.rs index 1db9a340f2e..9f63ecce24d 100644 --- a/zebra-rpc/src/queue/tests/prop.rs +++ b/zebra-rpc/src/queue/tests/prop.rs @@ -5,7 +5,7 @@ use std::{collections::HashSet, env, sync::Arc}; use proptest::prelude::*; use chrono::Duration; -use tokio::time; +use tokio::{sync::oneshot, time}; use tower::ServiceExt; use zebra_chain::{ @@ -196,7 +196,9 @@ proptest! { let request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]); let expected_request = Request::Queue(vec![Gossip::Tx(unmined_transaction.clone())]); let send_task = tokio::spawn(mempool.clone().oneshot(request)); - let response = Response::Queued(vec![Ok(())]); + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Ok(())); + let response = Response::Queued(vec![Ok(rsp_rx)]); mempool .expect_request(expected_request) @@ -337,7 +339,9 @@ proptest! { // retry will queue the transaction to mempool let gossip = Gossip::Tx(UnminedTx::from(transaction.clone())); let expected_request = Request::Queue(vec![gossip]); - let response = Response::Queued(vec![Ok(())]); + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Ok(())); + let response = Response::Queued(vec![Ok(rsp_rx)]); mempool .expect_request(expected_request) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 2d9b2b3e0c5..05732ddaac2 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -27,7 +27,7 @@ use std::{ }; use futures::{future::FutureExt, stream::Stream}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, oneshot}; use tokio_stream::StreamExt; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service}; @@ -560,7 +560,7 @@ impl Service for Mempool { for tx in tx_retries { // This is just an efficiency optimisation, so we don't care if queueing // transaction requests fails. - let _result = tx_downloads.download_if_needed_and_verify(tx); + let _result = tx_downloads.download_if_needed_and_verify(tx, None); } } @@ -608,8 +608,8 @@ impl Service for Mempool { tracing::trace!("chain grew during tx verification, retrying ..",); // We don't care if re-queueing the transaction request fails. - let _result = - tx_downloads.download_if_needed_and_verify(tx.transaction.into()); + let _result = tx_downloads + .download_if_needed_and_verify(tx.transaction.into(), None); } } Ok(Err((txid, error))) => { @@ -758,16 +758,24 @@ impl Service for Mempool { Request::Queue(gossiped_txs) => { trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request"); - let rsp: Vec> = gossiped_txs - .into_iter() - .map(|gossiped_tx| -> Result<(), MempoolError> { - storage.should_download_or_verify(gossiped_tx.id())?; - tx_downloads.download_if_needed_and_verify(gossiped_tx)?; - - Ok(()) - }) - .map(|result| result.map_err(BoxError::from)) - .collect(); + let rsp: Vec>, BoxError>> = + gossiped_txs + .into_iter() + .map( + |gossiped_tx| -> Result< + oneshot::Receiver>, + MempoolError, + > { + let (rsp_tx, rsp_rx) = oneshot::channel(); + storage.should_download_or_verify(gossiped_tx.id())?; + tx_downloads + .download_if_needed_and_verify(gossiped_tx, Some(rsp_tx))?; + + Ok(rsp_rx) + }, + ) + .map(|result| result.map_err(BoxError::from)) + .collect(); // We've added transactions to the queue self.update_metrics(); diff --git a/zebrad/src/components/mempool/crawler/tests/prop.rs b/zebrad/src/components/mempool/crawler/tests/prop.rs index fa1e3ef5785..524d754cfdc 100644 --- a/zebrad/src/components/mempool/crawler/tests/prop.rs +++ b/zebrad/src/components/mempool/crawler/tests/prop.rs @@ -6,7 +6,7 @@ use proptest::{ collection::{hash_set, vec}, prelude::*, }; -use tokio::time; +use tokio::{sync::oneshot, time}; use zebra_chain::{ chain_sync_status::ChainSyncStatus, parameters::Network, transaction::UnminedTxId, @@ -317,9 +317,17 @@ async fn respond_to_queue_request( expected_transaction_ids: HashSet, response: impl IntoIterator>, ) -> Result<(), TestCaseError> { - let response = response + let response: Vec>, BoxError>> = response .into_iter() - .map(|result| result.map_err(BoxError::from)) + .map(|result| { + result + .map(|_| { + let (rsp_tx, rsp_rx) = oneshot::channel(); + let _ = rsp_tx.send(Ok(())); + rsp_rx + }) + .map_err(BoxError::from) + }) .collect(); mempool diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index d3f62b4087b..b37f988dcc8 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -51,7 +51,7 @@ use zebra_chain::{ use zebra_consensus::transaction as tx; use zebra_network as zn; use zebra_node_services::mempool::Gossip; -use zebra_state as zs; +use zebra_state::{self as zs, CloneError}; use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; @@ -105,17 +105,17 @@ pub const MAX_INBOUND_CONCURRENCY: usize = 25; struct CancelDownloadAndVerify; /// Errors that can occur while downloading and verifying a transaction. -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] #[allow(dead_code)] pub enum TransactionDownloadVerifyError { #[error("transaction is already in state")] InState, #[error("error in state service")] - StateError(#[source] BoxError), + StateError(#[source] CloneError), #[error("error downloading transaction")] - DownloadFailed(#[source] BoxError), + DownloadFailed(#[source] CloneError), #[error("transaction download / verification was cancelled")] Cancelled, @@ -243,6 +243,7 @@ where pub fn download_if_needed_and_verify( &mut self, gossiped_tx: Gossip, + rsp_tx: Option>>, ) -> Result<(), MempoolError> { let txid = gossiped_tx.id(); @@ -295,7 +296,7 @@ where Ok((Some(height), next_height)) } Ok(_) => unreachable!("wrong response"), - Err(e) => Err(TransactionDownloadVerifyError::StateError(e)), + Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())), }?; trace!(?txid, ?next_height, "got next height"); @@ -307,11 +308,12 @@ where let tx = match network .oneshot(req) .await + .map_err(CloneError::from) .map_err(TransactionDownloadVerifyError::DownloadFailed)? { zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| { TransactionDownloadVerifyError::DownloadFailed( - "no transactions returned".into(), + BoxError::from("no transactions returned").into(), ) })?, _ => unreachable!("wrong response to transaction request"), @@ -373,7 +375,7 @@ where let task = tokio::spawn(async move { // Prefer the cancel handle if both are ready. - tokio::select! { + let result = tokio::select! { biased; _ = &mut cancel_rx => { trace!("task cancelled prior to completion"); @@ -381,7 +383,19 @@ where Err((TransactionDownloadVerifyError::Cancelled, txid)) } verification = fut => verification, + }; + + // Send the result to responder channel if one was provided. + if let Some(rsp_tx) = rsp_tx { + let _ = rsp_tx.send( + result + .as_ref() + .map(|_| ()) + .map_err(|(err, _)| err.clone().into()), + ); } + + result }); self.pending.push(task); @@ -458,6 +472,7 @@ where match state .ready() .await + .map_err(CloneError::from) .map_err(TransactionDownloadVerifyError::StateError)? .call(zs::Request::Transaction(txid.mined_id())) .await @@ -465,7 +480,7 @@ where Ok(zs::Response::Transaction(None)) => Ok(()), Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState), Ok(_) => unreachable!("wrong response"), - Err(e) => Err(TransactionDownloadVerifyError::StateError(e)), + Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())), }?; Ok(()) diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index 2868fef2e65..c285923fa7d 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -445,12 +445,17 @@ async fn mempool_cancel_mined() -> Result<(), Report> { .call(Request::Queue(vec![txid.into()])) .await .unwrap(); - let queued_responses = match response { + let mut queued_responses = match response { Response::Queued(queue_responses) => queue_responses, _ => unreachable!("will never happen in this test"), }; assert_eq!(queued_responses.len(), 1); - assert!(queued_responses[0].is_ok()); + + let queued_response = queued_responses + .pop() + .expect("already checked that there is exactly 1 item in Vec") + .expect("initial queue checks result should be Ok"); + assert_eq!(mempool.tx_downloads().in_flight(), 1); // Push block 2 to the state @@ -489,6 +494,14 @@ async fn mempool_cancel_mined() -> Result<(), Report> { // Check if download was cancelled. assert_eq!(mempool.tx_downloads().in_flight(), 0); + assert!( + queued_response + .await + .expect("channel should not be closed") + .is_err(), + "queued tx should fail to download and verify due to chain tip change" + ); + Ok(()) }