Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change(mempool): Return verification result after attempting to insert transactions in the mempool #8901

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions zebra-consensus/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,17 +584,17 @@ where
)?;

if let Some(mut mempool) = mempool {
if !transaction.transaction.transaction.outputs().is_empty() {
tokio::spawn(async move {
tokio::time::sleep(POLL_MEMPOOL_DELAY).await;
let _ = mempool
.ready()
.await
.expect("mempool poll_ready() method should not return an error")
.call(mempool::Request::CheckForVerifiedTransactions)
.await;
});
}
tokio::spawn(async move {
// Best-effort poll of the mempool to provide a timely response to
// `sendrawtransaction` RPC calls or `AwaitOutput` mempool calls.
tokio::time::sleep(POLL_MEMPOOL_DELAY).await;
let _ = mempool
.ready()
.await
.expect("mempool poll_ready() method should not return an error")
.call(mempool::Request::CheckForVerifiedTransactions)
.await;
});
}

Response::Mempool { transaction, spent_mempool_outpoints }
Expand Down
5 changes: 1 addition & 4 deletions zebra-node-services/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@ use zebra_chain::{
use crate::BoxError;

mod gossip;

mod transaction_dependencies;

pub use transaction_dependencies::TransactionDependencies;

pub use self::gossip::Gossip;
pub use self::{gossip::Gossip, transaction_dependencies::TransactionDependencies};

/// A mempool service request.
///
Expand Down
25 changes: 17 additions & 8 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use std::{

use futures::{future::FutureExt, stream::Stream};
use tokio::sync::{broadcast, oneshot};
use tokio_stream::StreamExt;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};

use zebra_chain::{
Expand All @@ -43,7 +42,7 @@ use zebra_node_services::mempool::{Gossip, Request, Response};
use zebra_state as zs;
use zebra_state::{ChainTipChange, TipAction};

use crate::components::{mempool::crawler::RATE_LIMIT_DELAY, sync::SyncStatus};
use crate::components::sync::SyncStatus;

pub mod config;
mod crawler;
Expand Down Expand Up @@ -586,10 +585,8 @@ impl Service<Request> for Mempool {
let best_tip_height = self.latest_chain_tip.best_tip_height();

// Clean up completed download tasks and add to mempool if successful.
while let Poll::Ready(Some(r)) =
pin!(tx_downloads.timeout(RATE_LIMIT_DELAY)).poll_next(cx)
{
match r {
while let Poll::Ready(Some((result, rsp_tx))) = pin!(&mut *tx_downloads).poll_next(cx) {
match result {
Ok(Ok((tx, spent_mempool_outpoints, expected_tip_height))) => {
// # Correctness:
//
Expand All @@ -609,27 +606,39 @@ impl Service<Request> for Mempool {
// Save transaction ids that we will send to peers
send_to_peers_ids.insert(inserted_id);
}

// Send the result to responder channel if one was provided.
if let Some(rsp_tx) = rsp_tx {
let _ = rsp_tx
.send(insert_result.map(|_| ()).map_err(|err| err.into()));
}
} else {
tracing::trace!("chain grew during tx verification, retrying ..",);

// We don't care if re-queueing the transaction request fails.
let _result = tx_downloads
.download_if_needed_and_verify(tx.transaction.into(), None);
.download_if_needed_and_verify(tx.transaction.into(), rsp_tx);
}
}
Ok(Err((tx_id, error))) => {
tracing::debug!(?tx_id, ?error, "mempool transaction failed to verify");

metrics::counter!("mempool.failed.verify.tasks.total", "reason" => error.to_string()).increment(1);

storage.reject_if_needed(tx_id, error);
}
Err(_elapsed) => {
Err(elapsed) => {
// A timeout happens when the stream hangs waiting for another service,
// so there is no specific transaction ID.

tracing::info!("mempool transaction failed to verify due to timeout");

metrics::counter!("mempool.failed.verify.tasks.total", "reason" => "timeout").increment(1);

// Send the result to responder channel if one was provided.
if let Some(rsp_tx) = rsp_tx {
let _ = rsp_tx.send(Err(elapsed.into()));
}
}
};
}
Expand Down
127 changes: 81 additions & 46 deletions zebrad/src/components/mempool/downloads.rs
arya2 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ use zebra_network as zn;
use zebra_node_services::mempool::Gossip;
use zebra_state::{self as zs, CloneError};

use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};
use crate::components::{
mempool::crawler::RATE_LIMIT_DELAY,
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
};

use super::MempoolError;

Expand Down Expand Up @@ -152,16 +155,20 @@ where
/// A list of pending transaction download and verify tasks.
#[pin]
pending: FuturesUnordered<
JoinHandle<
JoinHandle<(
Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
),
(TransactionDownloadVerifyError, UnminedTxId),
Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
),
(TransactionDownloadVerifyError, UnminedTxId),
>,
tokio::time::error::Elapsed,
>,
>,
Option<oneshot::Sender<Result<(), BoxError>>>,
)>,
>,

/// A list of channels that can be used to cancel pending transaction download and
Expand All @@ -178,14 +185,20 @@ where
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
type Item = Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
),
(UnminedTxId, TransactionDownloadVerifyError),
>;
type Item = (
Result<
Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
),
(UnminedTxId, TransactionDownloadVerifyError),
>,
tokio::time::error::Elapsed,
>,
Option<oneshot::Sender<Result<(), BoxError>>>,
);

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
Expand All @@ -198,20 +211,28 @@ where
// task is scheduled for wakeup when the next task becomes ready.
//
// TODO: this would be cleaner with poll_map (#2693)
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
match join_result.expect("transaction download and verify tasks must not panic") {
Ok((tx, spent_mempool_outpoints, tip_height)) => {
let item = if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
let (result, rsp_tx) =
join_result.expect("transaction download and verify tasks must not panic");

let result = match result {
Ok(Ok((tx, spent_mempool_outpoints, tip_height))) => {
this.cancel_handles.remove(&tx.transaction.id);
Poll::Ready(Some(Ok((tx, spent_mempool_outpoints, tip_height))))
Ok(Ok((tx, spent_mempool_outpoints, tip_height)))
}
Err((e, hash)) => {
Ok(Err((e, hash))) => {
this.cancel_handles.remove(&hash);
Poll::Ready(Some(Err((hash, e))))
Ok(Err((hash, e)))
}
}
Err(elapsed) => Err(elapsed),
};

Some((result, rsp_tx))
} else {
Poll::Ready(None)
}
None
};

Poll::Ready(item)
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down Expand Up @@ -255,7 +276,7 @@ where
pub fn download_if_needed_and_verify(
&mut self,
gossiped_tx: Gossip,
rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
mut rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
) -> Result<(), MempoolError> {
let txid = gossiped_tx.id();

Expand Down Expand Up @@ -381,37 +402,51 @@ where
// Tack the hash onto the error so we can remove the cancel handle
// on failure as well as on success.
.map_err(move |e| (e, txid))
.inspect(move |result| {
// Hide the transaction data to avoid filling the logs
let result = result.as_ref().map(|_tx| txid);
debug!("mempool transaction result: {result:?}");
})
.inspect(move |result| {
// Hide the transaction data to avoid filling the logs
let result = result.as_ref().map(|_tx| txid);
debug!("mempool transaction result: {result:?}");
})
.in_current_span();

let task = tokio::spawn(async move {
let fut = tokio::time::timeout(RATE_LIMIT_DELAY, fut);

// Prefer the cancel handle if both are ready.
let result = tokio::select! {
biased;
_ = &mut cancel_rx => {
trace!("task cancelled prior to completion");
metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1);
Err((TransactionDownloadVerifyError::Cancelled, txid))
if let Some(rsp_tx) = rsp_tx.take() {
let _ = rsp_tx.send(Err("verification cancelled".into()));
}

Ok(Err((TransactionDownloadVerifyError::Cancelled, txid)))
}
verification = fut => verification,
verification = fut => {
verification
.inspect_err(|_elapsed| {
if let Some(rsp_tx) = rsp_tx.take() {
let _ = rsp_tx.send(Err("timeout waiting for verification result".into()));
}
})
.inspect(|inner_result| {
let _ = inner_result
.as_ref()
.inspect_err(|(tx_verifier_error, tx_id)| {
if let Some(rsp_tx) = rsp_tx.take() {
let error_msg = format!(
"failed to validate tx: {tx_id}, error: {tx_verifier_error}"
);
let _ = rsp_tx.send(Err(error_msg.into()));
}
});
})
},
};

// Send the result to responder channel if one was provided.
// TODO: Wait until transactions are added to the verified set before sending an Ok to `rsp_tx`.
if let Some(rsp_tx) = rsp_tx {
let _ = rsp_tx.send(
result
.as_ref()
.map(|_| ())
.map_err(|(err, _)| err.clone().into()),
);
}

result
(result, rsp_tx)
});

self.pending.push(task);
Expand Down
18 changes: 9 additions & 9 deletions zebrad/src/components/mempool/tests/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,22 +978,22 @@ async fn mempool_responds_to_await_output() -> Result<(), Report> {
let result_rx = results.remove(0).expect("should pass initial checks");
assert!(results.is_empty(), "should have 1 result for 1 queued tx");

tokio::time::timeout(Duration::from_secs(10), result_rx)
.await
.expect("should not time out")
.expect("mempool tx verification result channel should not be closed")
.expect("mocked verification should be successful");

// Wait for next steps in mempool's Downloads to finish
// TODO: Move this and the `ready().await` below above waiting for the mempool verification result above after
// waiting to respond with a transaction's verification result until after it's been inserted into the mempool.
// Wait for post-verification steps in mempool's Downloads
tokio::time::sleep(Duration::from_secs(1)).await;

// Note: Buffered services shouldn't be polled without being called.
// See `mempool::Request::CheckForVerifiedTransactions` for more details.
mempool
.ready()
.await
.expect("polling mempool should succeed");

tokio::time::timeout(Duration::from_secs(10), result_rx)
.await
.expect("should not time out")
.expect("mempool tx verification result channel should not be closed")
.expect("mocked verification should be successful");

assert_eq!(
mempool.storage().transaction_count(),
1,
Expand Down
Loading