diff --git a/Cargo.lock b/Cargo.lock index 136aea8b204..65d152425e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4566,6 +4566,8 @@ dependencies = [ "lazy_static", "metrics", "once_cell", + "proptest", + "proptest-derive", "rand 0.7.3", "rand 0.8.4", "serde", diff --git a/zebra-chain/src/parameters/network_upgrade.rs b/zebra-chain/src/parameters/network_upgrade.rs index 0f2c5cba5d9..4255d26a469 100644 --- a/zebra-chain/src/parameters/network_upgrade.rs +++ b/zebra-chain/src/parameters/network_upgrade.rs @@ -10,11 +10,15 @@ use std::ops::Bound::*; use chrono::{DateTime, Duration, Utc}; +#[cfg(any(test, feature = "proptest-impl"))] +use proptest_derive::Arbitrary; + /// A Zcash network upgrade. /// /// Network upgrades can change the Zcash network protocol or consensus rules in /// incompatible ways. #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, Serialize, Deserialize)] +#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub enum NetworkUpgrade { /// The Zcash protocol for a Genesis block. /// diff --git a/zebra-consensus/Cargo.toml b/zebra-consensus/Cargo.toml index b8bb3802f3d..f66ca9277c9 100644 --- a/zebra-consensus/Cargo.toml +++ b/zebra-consensus/Cargo.toml @@ -5,6 +5,10 @@ authors = ["Zcash Foundation "] license = "MIT OR Apache-2.0" edition = "2018" +[features] +default = [] +proptest-impl = ["proptest", "proptest-derive", "zebra-chain/proptest-impl"] + [dependencies] blake2b_simd = "0.5.11" bellman = "0.10.0" @@ -33,8 +37,13 @@ zebra-state = { path = "../zebra-state" } zebra-script = { path = "../zebra-script" } wagyu-zcash-parameters = "0.2.0" +proptest = { version = "0.10", optional = true } +proptest-derive = { version = "0.3.0", optional = true } + [dev-dependencies] color-eyre = "0.5.11" +proptest = "0.10" +proptest-derive = "0.3.0" rand07 = { package = "rand", version = "0.7" } spandoc = "0.2" tokio = { version = "0.3.6", features = ["full"] } diff --git a/zebra-consensus/src/error.rs b/zebra-consensus/src/error.rs index 499b679040f..333eff10a76 100644 --- a/zebra-consensus/src/error.rs +++ b/zebra-consensus/src/error.rs @@ -9,6 +9,9 @@ use thiserror::Error; use crate::BoxError; +#[cfg(any(test, feature = "proptest-impl"))] +use proptest_derive::Arbitrary; + #[derive(Error, Copy, Clone, Debug, PartialEq)] pub enum SubsidyError { #[error("no coinbase transaction in block")] @@ -19,6 +22,7 @@ pub enum SubsidyError { } #[derive(Error, Clone, Debug, PartialEq)] +#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] pub enum TransactionError { #[error("first transaction must be coinbase")] CoinbasePosition, @@ -45,6 +49,7 @@ pub enum TransactionError { CoinbaseInMempool, #[error("coinbase transaction failed subsidy validation")] + #[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))] Subsidy(#[from] SubsidyError), #[error("transaction version number MUST be >= 4")] @@ -63,6 +68,7 @@ pub enum TransactionError { BadBalance, #[error("could not verify a transparent script")] + #[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))] Script(#[from] zebra_script::Error), #[error("spend description cv and rk MUST NOT be of small order")] @@ -76,12 +82,15 @@ pub enum TransactionError { #[error( "Sprout joinSplitSig MUST represent a valid signature under joinSplitPubKey of dataToBeSigned" )] + #[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))] Ed25519(#[from] zebra_chain::primitives::ed25519::Error), #[error("Sapling bindingSig MUST represent a valid signature under the transaction binding validating key bvk of SigHash")] + #[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))] RedJubjub(zebra_chain::primitives::redjubjub::Error), #[error("Orchard bindingSig MUST represent a valid signature under the transaction binding validating key bvk of SigHash")] + #[cfg_attr(any(test, feature = "proptest-impl"), proptest(skip))] RedPallas(zebra_chain::primitives::redpallas::Error), // temporary error type until #1186 is fixed diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index 363d0d082af..ec8688131ca 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -61,6 +61,7 @@ proptest = "0.10" proptest-derive = "0.3" zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } +zebra-consensus = { path = "../zebra-consensus/", features = ["proptest-impl"] } zebra-state = { path = "../zebra-state", features = ["proptest-impl"] } zebra-test = { path = "../zebra-test" } diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 282e149f280..44895768f13 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -102,12 +102,12 @@ impl StartCmd { let mempool = ServiceBuilder::new().buffer(20).service(mempool_service); setup_tx - .send((peer_set.clone(), address_book, mempool)) + .send((peer_set.clone(), address_book, mempool.clone())) .map_err(|_| eyre!("could not send setup data to inbound service"))?; select! { result = syncer.sync().fuse() => result, - _ = mempool::Crawler::spawn(peer_set, sync_status).fuse() => { + _ = mempool::Crawler::spawn(peer_set, mempool, sync_status).fuse() => { unreachable!("The mempool crawler only stops if it panics"); } } diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 28e45b13c5b..7840f59f78f 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -50,7 +50,7 @@ type TxVerifier = Buffer< >; type InboundTxDownloads = TxDownloads, Timeout, State>; -#[derive(Debug)] +#[derive(Debug, Eq, PartialEq)] #[allow(dead_code)] pub enum Request { TransactionIds, diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index 2619aa38ea2..17d6e21e7e4 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -8,9 +8,12 @@ use futures::{stream::FuturesUnordered, StreamExt}; use tokio::{task::JoinHandle, time::sleep}; use tower::{timeout::Timeout, BoxError, Service, ServiceExt}; -use zebra_network::{Request, Response}; +use zebra_network as zn; -use super::super::sync::SyncStatus; +use super::{ + super::{mempool, sync::SyncStatus}, + downloads::Gossip, +}; #[cfg(test)] mod tests; @@ -31,20 +34,30 @@ const RATE_LIMIT_DELAY: Duration = Duration::from_secs(75); const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6); /// The mempool transaction crawler. -pub struct Crawler { - peer_set: Timeout, +pub struct Crawler { + peer_set: Timeout, + mempool: Mempool, status: SyncStatus, } -impl Crawler +impl Crawler where - S: Service + Clone + Send + 'static, - S::Future: Send, + PeerSet: + Service + Clone + Send + 'static, + PeerSet::Future: Send, + Mempool: + Service + Send + 'static, + Mempool::Future: Send, { /// Spawn an asynchronous task to run the mempool crawler. - pub fn spawn(peer_set: S, status: SyncStatus) -> JoinHandle> { + pub fn spawn( + peer_set: PeerSet, + mempool: Mempool, + status: SyncStatus, + ) -> JoinHandle> { let crawler = Crawler { peer_set: Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT), + mempool, status, }; @@ -79,13 +92,13 @@ where // end the task on permanent peer set errors let peer_set = peer_set.ready_and().await?; - requests.push(peer_set.call(Request::MempoolTransactionIds)); + requests.push(peer_set.call(zn::Request::MempoolTransactionIds)); } while let Some(result) = requests.next().await { // log individual response errors match result { - Ok(response) => self.handle_response(response).await, + Ok(response) => self.handle_response(response).await?, // TODO: Reduce the log level of the errors (#2655). Err(error) => info!("Failed to crawl peer for mempool transactions: {}", error), } @@ -95,9 +108,9 @@ where } /// Handle a peer's response to the crawler's request for transactions. - async fn handle_response(&mut self, response: Response) { - let transaction_ids = match response { - Response::TransactionIds(ids) => ids, + async fn handle_response(&mut self, response: zn::Response) -> Result<(), BoxError> { + let transaction_ids: Vec<_> = match response { + zn::Response::TransactionIds(ids) => ids.into_iter().map(Gossip::Id).collect(), _ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"), }; @@ -106,6 +119,37 @@ where transaction_ids.len() ); - // TODO: Send transaction IDs to the download and verify stream (#2650) + if !transaction_ids.is_empty() { + self.queue_transactions(transaction_ids).await?; + } + + Ok(()) + } + + /// Forward the crawled transactions IDs to the mempool transaction downloader. + async fn queue_transactions(&mut self, transaction_ids: Vec) -> Result<(), BoxError> { + let call_result = self + .mempool + .ready_and() + .await? + .call(mempool::Request::Queue(transaction_ids)) + .await; + + let queue_errors = match call_result { + Ok(mempool::Response::Queued(queue_results)) => { + queue_results.into_iter().filter_map(Result::err) + } + Ok(_) => unreachable!("Mempool did not respond with queue results to mempool crawler"), + Err(call_error) => { + debug!("Ignoring unexpected peer behavior: {}", call_error); + return Ok(()); + } + }; + + for error in queue_errors { + debug!("Failed to download a crawled transaction: {}", error); + } + + Ok(()) } } diff --git a/zebrad/src/components/mempool/crawler/tests.rs b/zebrad/src/components/mempool/crawler/tests.rs index 56d34b7cc41..7a97b72c496 100644 --- a/zebrad/src/components/mempool/crawler/tests.rs +++ b/zebrad/src/components/mempool/crawler/tests.rs @@ -1,12 +1,20 @@ use std::time::Duration; -use proptest::prelude::*; +use proptest::{collection::vec, prelude::*}; use tokio::time; -use zebra_network::{Request, Response}; -use zebra_test::mock_service::MockService; +use zebra_chain::transaction::UnminedTxId; +use zebra_network as zn; +use zebra_test::mock_service::{MockService, PropTestAssertion}; -use super::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY}; +use super::{ + super::{ + super::{mempool, sync::RecentSyncLengths}, + downloads::Gossip, + error::MempoolError, + }, + Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY, +}; /// The number of iterations to crawl while testing. /// @@ -16,10 +24,24 @@ use super::{Crawler, SyncStatus, FANOUT, RATE_LIMIT_DELAY}; /// [`MockServiceBuilder::with_max_request_delay`]). const CRAWL_ITERATIONS: usize = 4; +/// The maximum number of transactions crawled from a mocked peer. +const MAX_CRAWLED_TX: usize = 10; + /// The amount of time to advance beyond the expected instant that the crawler wakes up. const ERROR_MARGIN: Duration = Duration::from_millis(100); +/// A [`MockService`] representing the network service. +type MockPeerSet = MockService; + +/// A [`MockService`] representing the mempool service. +type MockMempool = MockService; + proptest! { + /// Test if crawler periodically crawls for transaction IDs. + /// + /// The crawler should periodically perform a fanned-out series of requests to obtain + /// transaction IDs from other peers. These requests should only be sent if the mempool is + /// enabled, i.e., if the block synchronizer is likely close to the chain tip. #[test] fn crawler_requests_for_transaction_ids(mut sync_lengths in any::>()) { let runtime = tokio::runtime::Builder::new_current_thread() @@ -32,25 +54,17 @@ proptest! { sync_lengths.push(0); runtime.block_on(async move { - let mut peer_set = MockService::build().for_prop_tests(); - let (sync_status, mut recent_sync_lengths) = SyncStatus::new(); + let (mut peer_set, _mempool, sync_status, mut recent_sync_lengths) = setup_crawler(); time::pause(); - Crawler::spawn(peer_set.clone(), sync_status.clone()); - for sync_length in sync_lengths { let mempool_is_enabled = sync_status.is_close_to_tip(); for _ in 0..CRAWL_ITERATIONS { for _ in 0..FANOUT { if mempool_is_enabled { - peer_set - .expect_request_that(|request| { - matches!(request, Request::MempoolTransactionIds) - }) - .await? - .respond(Response::TransactionIds(vec![])); + respond_with_transaction_ids(&mut peer_set, vec![]).await?; } else { peer_set.expect_no_requests().await?; } @@ -70,4 +84,241 @@ proptest! { Ok::<(), TestCaseError>(()) })?; } + + /// Test if crawled transactions are forwarded to the [`Mempool`][mempool::Mempool] service. + /// + /// The transaction IDs sent by other peers to the crawler should be forwarded to the + /// [`Mempool`][mempool::Mempool] service so that they can be downloaded, verified and added to + /// the mempool. + #[test] + fn crawled_transactions_are_forwarded_to_downloader( + transaction_ids in vec(any::(), 1..MAX_CRAWLED_TX), + ) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); + + let transaction_id_count = transaction_ids.len(); + + runtime.block_on(async move { + let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths) = + setup_crawler(); + + time::pause(); + + // Mock end of chain sync to enable the mempool crawler. + SyncStatus::sync_close_to_tip(&mut recent_sync_lengths); + + crawler_iteration(&mut peer_set, vec![transaction_ids.clone()]).await?; + + respond_to_queue_request( + &mut mempool, + transaction_ids, + vec![Ok(()); transaction_id_count], + ).await?; + + mempool.expect_no_requests().await?; + + Ok::<(), TestCaseError>(()) + })?; + } + + /// Test if errors while forwarding transaction IDs do not stop the crawler. + /// + /// The crawler should continue operating normally if some transactions fail to download or + /// even if the mempool service fails to enqueue the transactions to be downloaded. + #[test] + fn transaction_id_forwarding_errors_dont_stop_the_crawler( + service_call_error in any::(), + transaction_ids_for_call_failure in vec(any::(), 1..MAX_CRAWLED_TX), + transaction_ids_and_responses in + vec(any::<(UnminedTxId, Result<(), MempoolError>)>(), 1..MAX_CRAWLED_TX), + transaction_ids_for_return_to_normal in vec(any::(), 1..MAX_CRAWLED_TX), + ) { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + let _guard = runtime.enter(); + + runtime.block_on(async move { + let (mut peer_set, mut mempool, _sync_status, mut recent_sync_lengths) = + setup_crawler(); + + time::pause(); + + // Mock end of chain sync to enable the mempool crawler. + SyncStatus::sync_close_to_tip(&mut recent_sync_lengths); + + // Prepare to simulate download errors. + let download_result_count = transaction_ids_and_responses.len(); + let mut transaction_ids_for_download_errors = Vec::with_capacity(download_result_count); + let mut download_result_list = Vec::with_capacity(download_result_count); + + for (transaction_id, result) in transaction_ids_and_responses { + transaction_ids_for_download_errors.push(transaction_id); + download_result_list.push(result); + } + + // First crawl iteration: + // 1. Fails with a mempool call error + // 2. Some downloads fail + // Rest: no crawled transactions + crawler_iteration( + &mut peer_set, + vec![ + transaction_ids_for_call_failure.clone(), + transaction_ids_for_download_errors.clone(), + ], + ) + .await?; + + // First test with an error returned from the Mempool service. + respond_to_queue_request_with_error( + &mut mempool, + transaction_ids_for_call_failure, + service_call_error, + ).await?; + + // Then test a failure to download transactions. + respond_to_queue_request( + &mut mempool, + transaction_ids_for_download_errors, + download_result_list, + ).await?; + + mempool.expect_no_requests().await?; + + // Wait until next crawl iteration. + time::sleep(RATE_LIMIT_DELAY).await; + + // Second crawl iteration: + // The mempool should continue crawling normally. + crawler_iteration( + &mut peer_set, + vec![transaction_ids_for_return_to_normal.clone()], + ) + .await?; + + let response_list = vec![Ok(()); transaction_ids_for_return_to_normal.len()]; + + respond_to_queue_request( + &mut mempool, + transaction_ids_for_return_to_normal, + response_list, + ).await?; + + mempool.expect_no_requests().await?; + + Ok::<(), TestCaseError>(()) + })?; + } +} + +/// Spawn a crawler instance using mock services. +fn setup_crawler() -> (MockPeerSet, MockMempool, SyncStatus, RecentSyncLengths) { + let peer_set = MockService::build().for_prop_tests(); + let mempool = MockService::build().for_prop_tests(); + let (sync_status, recent_sync_lengths) = SyncStatus::new(); + + Crawler::spawn(peer_set.clone(), mempool.clone(), sync_status.clone()); + + (peer_set, mempool, sync_status, recent_sync_lengths) +} + +/// Intercept a request for mempool transaction IDs and respond with the `transaction_ids` list. +async fn respond_with_transaction_ids( + peer_set: &mut MockPeerSet, + transaction_ids: Vec, +) -> Result<(), TestCaseError> { + peer_set + .expect_request(zn::Request::MempoolTransactionIds) + .await? + .respond(zn::Response::TransactionIds(transaction_ids)); + + Ok(()) +} + +/// Intercept fanned-out requests for mempool transaction IDs and answer with the `responses`. +/// +/// Each item in `responses` is a list of transaction IDs to send back to a single request. +/// Therefore, each item represents the response sent by a peer in the network. +/// +/// If there are less items in `responses` the [`FANOUT`] number, then the remaining requests are +/// answered with an empty list of transaction IDs. +/// +/// # Panics +/// +/// If `responses` contains more items than the [`FANOUT`] number. +async fn crawler_iteration( + peer_set: &mut MockPeerSet, + responses: Vec>, +) -> Result<(), TestCaseError> { + let empty_responses = FANOUT + .checked_sub(responses.len()) + .expect("Too many responses to be sent in a single crawl iteration"); + + for response in responses { + respond_with_transaction_ids(peer_set, response).await?; + } + + for _ in 0..empty_responses { + respond_with_transaction_ids(peer_set, vec![]).await?; + } + + peer_set.expect_no_requests().await?; + + Ok(()) +} + +/// Intercept request for mempool to download and verify transactions. +/// +/// The intercepted request will be verified to check if it has the `expected_transaction_ids`, and +/// it will be answered with a list of results, one for each transaction requested to be +/// downloaded. +/// +/// # Panics +/// +/// If `response` and `expected_transaction_ids` have different sizes. +async fn respond_to_queue_request( + mempool: &mut MockMempool, + expected_transaction_ids: Vec, + response: Vec>, +) -> Result<(), TestCaseError> { + let request_parameter = expected_transaction_ids + .into_iter() + .map(Gossip::Id) + .collect(); + + mempool + .expect_request(mempool::Request::Queue(request_parameter)) + .await? + .respond(mempool::Response::Queued(response)); + + Ok(()) +} + +/// Intercept request for mempool to download and verify transactions, and answer with an error. +/// +/// The intercepted request will be verified to check if it has the `expected_transaction_ids`, and +/// it will be answered with `error`, as if the service had an internal failure that prevented it +/// from queuing the transactions for downloading. +async fn respond_to_queue_request_with_error( + mempool: &mut MockMempool, + expected_transaction_ids: Vec, + error: MempoolError, +) -> Result<(), TestCaseError> { + let request_parameter = expected_transaction_ids + .into_iter() + .map(Gossip::Id) + .collect(); + + mempool + .expect_request(mempool::Request::Queue(request_parameter)) + .await? + .respond(Err(error)); + + Ok(()) } diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index bb7d9b0aaec..d0714258875 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -66,7 +66,7 @@ pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT; pub(crate) const MAX_INBOUND_CONCURRENCY: usize = 10; /// A gossiped transaction, which can be the transaction itself or just its ID. -#[derive(Debug)] +#[derive(Debug, Eq, PartialEq)] pub enum Gossip { Id(UnminedTxId), Tx(UnminedTx), diff --git a/zebrad/src/components/mempool/error.rs b/zebrad/src/components/mempool/error.rs index 9ac91ba46f3..3c686898385 100644 --- a/zebrad/src/components/mempool/error.rs +++ b/zebrad/src/components/mempool/error.rs @@ -2,7 +2,11 @@ use thiserror::Error; +#[cfg(any(test, feature = "proptest-impl"))] +use proptest_derive::Arbitrary; + #[derive(Error, Clone, Debug, PartialEq)] +#[cfg_attr(any(test, feature = "proptest-impl"), derive(Arbitrary))] #[allow(dead_code)] pub enum MempoolError { #[error("transaction already exists in mempool")]