diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index 02f97fb7778..f9b68ba2215 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -584,17 +584,17 @@ where )?; if let Some(mut mempool) = mempool { - if !transaction.transaction.transaction.outputs().is_empty() { - tokio::spawn(async move { - tokio::time::sleep(POLL_MEMPOOL_DELAY).await; - let _ = mempool - .ready() - .await - .expect("mempool poll_ready() method should not return an error") - .call(mempool::Request::CheckForVerifiedTransactions) - .await; - }); - } + tokio::spawn(async move { + // Best-effort poll of the mempool to provide a timely response to + // `sendrawtransaction` RPC calls or `AwaitOutput` mempool calls. + tokio::time::sleep(POLL_MEMPOOL_DELAY).await; + let _ = mempool + .ready() + .await + .expect("mempool poll_ready() method should not return an error") + .call(mempool::Request::CheckForVerifiedTransactions) + .await; + }); } Response::Mempool { transaction, spent_mempool_outpoints } diff --git a/zebra-node-services/src/mempool.rs b/zebra-node-services/src/mempool.rs index 784c3ba607e..6c035e6dc44 100644 --- a/zebra-node-services/src/mempool.rs +++ b/zebra-node-services/src/mempool.rs @@ -13,12 +13,9 @@ use zebra_chain::{ use crate::BoxError; mod gossip; - mod transaction_dependencies; -pub use transaction_dependencies::TransactionDependencies; - -pub use self::gossip::Gossip; +pub use self::{gossip::Gossip, transaction_dependencies::TransactionDependencies}; /// A mempool service request. /// diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index bc5796df615..d492a97a91e 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -28,7 +28,6 @@ use std::{ use futures::{future::FutureExt, stream::Stream}; use tokio::sync::{broadcast, oneshot}; -use tokio_stream::StreamExt; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service}; use zebra_chain::{ @@ -43,7 +42,7 @@ use zebra_node_services::mempool::{Gossip, Request, Response}; use zebra_state as zs; use zebra_state::{ChainTipChange, TipAction}; -use crate::components::{mempool::crawler::RATE_LIMIT_DELAY, sync::SyncStatus}; +use crate::components::sync::SyncStatus; pub mod config; mod crawler; @@ -586,10 +585,8 @@ impl Service for Mempool { let best_tip_height = self.latest_chain_tip.best_tip_height(); // Clean up completed download tasks and add to mempool if successful. - while let Poll::Ready(Some(r)) = - pin!(tx_downloads.timeout(RATE_LIMIT_DELAY)).poll_next(cx) - { - match r { + while let Poll::Ready(Some((result, rsp_tx))) = pin!(&mut *tx_downloads).poll_next(cx) { + match result { Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height))) => { // # Correctness: // @@ -609,27 +606,39 @@ impl Service for Mempool { // Save transaction ids that we will send to peers send_to_peers_ids.insert(inserted_id); } + + // Send the result to responder channel if one was provided. + if let Some(rsp_tx) = rsp_tx { + let _ = rsp_tx + .send(insert_result.map(|_| ()).map_err(|err| err.into())); + } } else { tracing::trace!("chain grew during tx verification, retrying ..",); // We don't care if re-queueing the transaction request fails. let _result = tx_downloads - .download_if_needed_and_verify(tx.transaction.into(), None); + .download_if_needed_and_verify(tx.transaction.into(), rsp_tx); } } Ok(Err((tx_id, error))) => { tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify"); metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1); + storage.reject_if_needed(tx_id, error); } - Err(_elapsed) => { + Err(elapsed) => { // A timeout happens when the stream hangs waiting for another service, // so there is no specific transaction ID. tracing::info!("mempool transaction failed to verify due to timeout"); metrics::counter!("mempool.failed.verify.tasks.total", "reason" => "timeout").increment(1); + + // Send the result to responder channel if one was provided. + if let Some(rsp_tx) = rsp_tx { + let _ = rsp_tx.send(Err(elapsed.into())); + } } }; } diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 45fd44a7c05..2d63a6bac0f 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -54,7 +54,10 @@ use zebra_network as zn; use zebra_node_services::mempool::Gossip; use zebra_state::{self as zs, CloneError}; -use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; +use crate::components::{ + mempool::crawler::RATE_LIMIT_DELAY, + sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}, +}; use super::MempoolError; @@ -152,16 +155,20 @@ where /// A list of pending transaction download and verify tasks. #[pin] pending: FuturesUnordered< - JoinHandle< + JoinHandle<( Result< - ( - VerifiedUnminedTx, - Vec, - Option, - ), - (TransactionDownloadVerifyError, UnminedTxId), + Result< + ( + VerifiedUnminedTx, + Vec, + Option, + ), + (TransactionDownloadVerifyError, UnminedTxId), + >, + tokio::time::error::Elapsed, >, - >, + Option>>, + )>, >, /// A list of channels that can be used to cancel pending transaction download and @@ -178,14 +185,20 @@ where ZS: Service + Send + Clone + 'static, ZS::Future: Send, { - type Item = Result< - ( - VerifiedUnminedTx, - Vec, - Option, - ), - (UnminedTxId, TransactionDownloadVerifyError), - >; + type Item = ( + Result< + Result< + ( + VerifiedUnminedTx, + Vec, + Option, + ), + (UnminedTxId, TransactionDownloadVerifyError), + >, + tokio::time::error::Elapsed, + >, + Option>>, + ); fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let this = self.project(); @@ -198,20 +211,28 @@ where // task is scheduled for wakeup when the next task becomes ready. // // TODO: this would be cleaner with poll_map (#2693) - if let Some(join_result) = ready!(this.pending.poll_next(cx)) { - match join_result.expect("transaction download and verify tasks must not panic") { - Ok((tx, spent_mempool_outpoints, tip_height)) => { + let item = if let Some(join_result) = ready!(this.pending.poll_next(cx)) { + let (result, rsp_tx) = + join_result.expect("transaction download and verify tasks must not panic"); + + let result = match result { + Ok(Ok((tx, spent_mempool_outpoints, tip_height))) => { this.cancel_handles.remove(&tx.transaction.id); - Poll::Ready(Some(Ok((tx, spent_mempool_outpoints, tip_height)))) + Ok(Ok((tx, spent_mempool_outpoints, tip_height))) } - Err((e, hash)) => { + Ok(Err((e, hash))) => { this.cancel_handles.remove(&hash); - Poll::Ready(Some(Err((hash, e)))) + Ok(Err((hash, e))) } - } + Err(elapsed) => Err(elapsed), + }; + + Some((result, rsp_tx)) } else { - Poll::Ready(None) - } + None + }; + + Poll::Ready(item) } fn size_hint(&self) -> (usize, Option) { @@ -255,7 +276,7 @@ where pub fn download_if_needed_and_verify( &mut self, gossiped_tx: Gossip, - rsp_tx: Option>>, + mut rsp_tx: Option>>, ) -> Result<(), MempoolError> { let txid = gossiped_tx.id(); @@ -381,37 +402,51 @@ where // Tack the hash onto the error so we can remove the cancel handle // on failure as well as on success. .map_err(move |e| (e, txid)) - .inspect(move |result| { - // Hide the transaction data to avoid filling the logs - let result = result.as_ref().map(|_tx| txid); - debug!("mempool transaction result: {result:?}"); - }) + .inspect(move |result| { + // Hide the transaction data to avoid filling the logs + let result = result.as_ref().map(|_tx| txid); + debug!("mempool transaction result: {result:?}"); + }) .in_current_span(); let task = tokio::spawn(async move { + let fut = tokio::time::timeout(RATE_LIMIT_DELAY, fut); + // Prefer the cancel handle if both are ready. let result = tokio::select! { biased; _ = &mut cancel_rx => { trace!("task cancelled prior to completion"); metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1); - Err((TransactionDownloadVerifyError::Cancelled, txid)) + if let Some(rsp_tx) = rsp_tx.take() { + let _ = rsp_tx.send(Err("verification cancelled".into())); + } + + Ok(Err((TransactionDownloadVerifyError::Cancelled, txid))) } - verification = fut => verification, + verification = fut => { + verification + .inspect_err(|_elapsed| { + if let Some(rsp_tx) = rsp_tx.take() { + let _ = rsp_tx.send(Err("timeout waiting for verification result".into())); + } + }) + .inspect(|inner_result| { + let _ = inner_result + .as_ref() + .inspect_err(|(tx_verifier_error, tx_id)| { + if let Some(rsp_tx) = rsp_tx.take() { + let error_msg = format!( + "failed to validate tx: {tx_id}, error: {tx_verifier_error}" + ); + let _ = rsp_tx.send(Err(error_msg.into())); + } + }); + }) + }, }; - // Send the result to responder channel if one was provided. - // TODO: Wait until transactions are added to the verified set before sending an Ok to `rsp_tx`. - if let Some(rsp_tx) = rsp_tx { - let _ = rsp_tx.send( - result - .as_ref() - .map(|_| ()) - .map_err(|(err, _)| err.clone().into()), - ); - } - - result + (result, rsp_tx) }); self.pending.push(task); diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index 1b87097aaf1..9dd3557de9c 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -978,22 +978,22 @@ async fn mempool_responds_to_await_output() -> Result<(), Report> { let result_rx = results.remove(0).expect("should pass initial checks"); assert!(results.is_empty(), "should have 1 result for 1 queued tx"); - tokio::time::timeout(Duration::from_secs(10), result_rx) - .await - .expect("should not time out") - .expect("mempool tx verification result channel should not be closed") - .expect("mocked verification should be successful"); - - // Wait for next steps in mempool's Downloads to finish - // TODO: Move this and the `ready().await` below above waiting for the mempool verification result above after - // waiting to respond with a transaction's verification result until after it's been inserted into the mempool. + // Wait for post-verification steps in mempool's Downloads tokio::time::sleep(Duration::from_secs(1)).await; + // Note: Buffered services shouldn't be polled without being called. + // See `mempool::Request::CheckForVerifiedTransactions` for more details. mempool .ready() .await .expect("polling mempool should succeed"); + tokio::time::timeout(Duration::from_secs(10), result_rx) + .await + .expect("should not time out") + .expect("mempool tx verification result channel should not be closed") + .expect("mocked verification should be successful"); + assert_eq!( mempool.storage().transaction_count(), 1,