From f6ef6dcf86055e74ee62bf42f3882100826b69c9 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Wed, 15 Sep 2021 14:27:53 -0300 Subject: [PATCH 1/8] Cancel download and verify tasks when the mempool is deactivated --- zebrad/src/components/inbound/tests.rs | 5 +- zebrad/src/components/mempool.rs | 23 ++++ zebrad/src/components/mempool/downloads.rs | 22 ++++ zebrad/src/components/mempool/error.rs | 3 + zebrad/src/components/mempool/tests.rs | 145 ++++++++++++++++++++- zebrad/src/components/sync/status.rs | 18 +++ 6 files changed, 211 insertions(+), 5 deletions(-) diff --git a/zebrad/src/components/inbound/tests.rs b/zebrad/src/components/inbound/tests.rs index 2c1b4b2b57a..8dd066b0169 100644 --- a/zebrad/src/components/inbound/tests.rs +++ b/zebrad/src/components/inbound/tests.rs @@ -23,7 +23,7 @@ async fn mempool_requests_for_transactions() { let (peer_set, _) = mock_peer_set(); let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none()); let address_book = Arc::new(std::sync::Mutex::new(address_book)); - let (sync_status, _recent_syncs) = SyncStatus::new(); + let (sync_status, mut recent_syncs) = SyncStatus::new(); let (state, _, _) = zebra_state::init(state_config, network); let state_service = ServiceBuilder::new().buffer(1).service(state); @@ -61,6 +61,9 @@ async fn mempool_requests_for_transactions() { // We can't expect or unwrap because the returned Result does not implement Debug assert!(r.is_ok()); + // Pretend we're close to tip to enable the mempool + SyncStatus::sync_close_to_tip(&mut recent_syncs); + // Test `Request::MempoolTransactionIds` let request = inbound_service .clone() diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index f6826d1b53c..2b7b3bae6b8 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -82,6 +82,9 @@ pub struct Mempool { /// Allows checking if we are near the tip to enable/disable the mempool. #[allow(dead_code)] sync_status: SyncStatus, + + /// Indicates whether the mempool is enabled or not. + enabled: bool, } impl Mempool { @@ -102,6 +105,7 @@ impl Mempool { storage: Default::default(), tx_downloads, sync_status, + enabled: false, } } @@ -111,6 +115,12 @@ impl Mempool { &mut self.storage } + /// Get the transaction downloader of the mempool for testing purposes. + #[cfg(test)] + pub fn tx_downloads(&self) -> &Pin> { + &self.tx_downloads + } + /// Check if transaction should be downloaded and/or verified. /// /// If it is already in the mempool (or in its rejected list) @@ -134,6 +144,16 @@ impl Service for Mempool { Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + let is_close_to_tip = self.sync_status.is_close_to_tip(); + if self.enabled && !is_close_to_tip { + // Disable mempool + self.tx_downloads.cancel_all(); + self.enabled = false; + } else if !self.enabled && is_close_to_tip { + // Enable mempool + self.enabled = true; + } + // Clean up completed download tasks and add to mempool if successful while let Poll::Ready(Some(r)) = self.tx_downloads.as_mut().poll_next(cx) { if let Ok(tx) = r { @@ -146,6 +166,9 @@ impl Service for Mempool { #[instrument(name = "mempool", skip(self, req))] fn call(&mut self, req: Request) -> Self::Future { + if !self.enabled { + return async move { Err(MempoolError::Disabled.into()) }.boxed(); + } match req { Request::TransactionIds => { let res = self.storage.tx_ids(); diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 4c7efcdb553..4fd052b5901 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -315,6 +315,28 @@ where Ok(()) } + /// Cancel all running tasks and reset the downloader state. + // Note: copied from zebrad/src/components/sync/downloads.rs + pub fn cancel_all(&mut self) { + // Replace the pending task list with an empty one and drop it. + let _ = std::mem::take(&mut self.pending); + // Signal cancellation to all running tasks. + // Since we already dropped the JoinHandles above, they should + // fail silently. + for (_hash, cancel) in self.cancel_handles.drain() { + let _ = cancel.send(()); + } + assert!(self.pending.is_empty()); + assert!(self.cancel_handles.is_empty()); + } + + /// Get the number of currently in-flight download tasks. + // Note: copied from zebrad/src/components/sync/downloads.rs + #[allow(dead_code)] + pub fn in_flight(&self) -> usize { + self.pending.len() + } + /// Check if transaction is already in the state. async fn transaction_in_state(state: &mut ZS, txid: UnminedTxId) -> Result<(), BoxError> { // Check if the transaction is already in the state. diff --git a/zebrad/src/components/mempool/error.rs b/zebrad/src/components/mempool/error.rs index 9d5eb94990d..12598e53a34 100644 --- a/zebrad/src/components/mempool/error.rs +++ b/zebrad/src/components/mempool/error.rs @@ -43,4 +43,7 @@ pub enum MempoolError { /// The queue's capacity is [`super::downloads::MAX_INBOUND_CONCURRENCY`]. #[error("transaction dropped because the queue is full")] FullQueue, + + #[error("mempool is disabled since synchronization did not reach the tip")] + Disabled, } diff --git a/zebrad/src/components/mempool/tests.rs b/zebrad/src/components/mempool/tests.rs index 844eaf0e87b..07441ddf90f 100644 --- a/zebrad/src/components/mempool/tests.rs +++ b/zebrad/src/components/mempool/tests.rs @@ -16,7 +16,7 @@ async fn mempool_service_basic() -> Result<(), Report> { let consensus_config = ConsensusConfig::default(); let state_config = StateConfig::ephemeral(); let (peer_set, _) = mock_peer_set(); - let (sync_status, _recent_syncs) = SyncStatus::new(); + let (sync_status, mut recent_syncs) = SyncStatus::new(); let (state, _, _) = zebra_state::init(state_config, network); let state_service = ServiceBuilder::new().buffer(1).service(state); @@ -37,6 +37,9 @@ async fn mempool_service_basic() -> Result<(), Report> { // Insert the genesis block coinbase transaction into the mempool storage. service.storage.insert(genesis_transactions.1[0].clone())?; + // Pretend we're close to tip to enable the mempool + SyncStatus::sync_close_to_tip(&mut recent_syncs); + // Test `Request::TransactionIds` let response = service .ready_and() @@ -75,14 +78,18 @@ async fn mempool_service_basic() -> Result<(), Report> { // Insert more transactions into the mempool storage. // This will cause the genesis transaction to be moved into rejected. - let more_transactions = unmined_transactions_in_blocks(10, network); - for tx in more_transactions.1.iter().skip(1) { + let (count, more_transactions) = unmined_transactions_in_blocks(10, network); + // Skip the first (used before) and the last (will be used later) + for tx in more_transactions.iter().skip(1).take(count - 2) { service.storage.insert(tx.clone())?; } // Test `Request::RejectedTransactionIds` let response = service - .oneshot(Request::RejectedTransactionIds( + .ready_and() + .await + .unwrap() + .call(Request::RejectedTransactionIds( genesis_transactions_hash_set, )) .await @@ -94,5 +101,135 @@ async fn mempool_service_basic() -> Result<(), Report> { assert_eq!(rejected_ids, genesis_transaction_ids); + // Test `Request::Queue` + // Use the ID of the last transaction in the list + let txid = more_transactions.last().unwrap().id; + let response = service + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![txid.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + assert_eq!(service.tx_downloads().in_flight(), 1); + + Ok(()) +} + +#[tokio::test] +async fn mempool_service_disabled() -> Result<(), Report> { + // Using the mainnet for now + let network = Network::Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let (peer_set, _) = mock_peer_set(); + let (sync_status, mut recent_syncs) = SyncStatus::new(); + + let (state, _, _) = zebra_state::init(state_config, network); + let state_service = ServiceBuilder::new().buffer(1).service(state); + let (_chain_verifier, tx_verifier) = + zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) + .await; + + // get the genesis block transactions from the Zcash blockchain. + let genesis_transactions = unmined_transactions_in_blocks(0, network); + // Start the mempool service + let mut service = Mempool::new( + network, + peer_set, + state_service.clone(), + tx_verifier, + sync_status, + ); + // Insert the genesis block coinbase transaction into the mempool storage. + service.storage.insert(genesis_transactions.1[0].clone())?; + + // Test if mempool is disabled (it should start disabled) + let response = service + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await; + assert_eq!( + *response + .expect_err("mempool should return an error") + .downcast_ref::() + .expect("error must be MempoolError"), + MempoolError::Disabled, + "error must be MempoolError::Disabled" + ); + + // Pretend we're close to tip to enable the mempool + SyncStatus::sync_close_to_tip(&mut recent_syncs); + + // Test if the mempool answers correctly (i.e. is enabled) + let response = service + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await + .unwrap(); + let _genesis_transaction_ids = match response { + Response::TransactionIds(ids) => ids, + _ => unreachable!("will never happen in this test"), + }; + + let (_count, more_transactions) = unmined_transactions_in_blocks(1, network); + + // Queue a transaction for download + // Use the ID of the last transaction in the list + let txid = more_transactions.last().unwrap().id; + let response = service + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![txid.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert!(queued_responses[0].is_ok()); + assert_eq!( + service.tx_downloads().in_flight(), + 1, + "Transaction must be queued for download" + ); + + // Pretend we're far from the tip to disable the mempool + SyncStatus::sync_far_from_tip(&mut recent_syncs); + + // Test if mempool is disabled again + let response = service + .ready_and() + .await + .unwrap() + .call(Request::TransactionIds) + .await; + assert_eq!( + *response + .expect_err("mempool should return an error") + .downcast_ref::() + .expect("error must be MempoolError"), + MempoolError::Disabled, + "error must be MempoolError::Disabled" + ); + + assert_eq!( + service.tx_downloads().in_flight(), + 0, + "Transaction download should have been cancelled" + ); + Ok(()) } diff --git a/zebrad/src/components/sync/status.rs b/zebrad/src/components/sync/status.rs index eac751b1f9c..4da8a17d8c2 100644 --- a/zebrad/src/components/sync/status.rs +++ b/zebrad/src/components/sync/status.rs @@ -69,4 +69,22 @@ impl SyncStatus { // average sync length falls below the threshold. avg < Self::MIN_DIST_FROM_TIP } + + /// Feed the given [`RecentSyncLengths`] it order to make the matching + /// [`SyncStatus`] report that it's close to the tip. + #[cfg(test)] + pub(crate) fn sync_close_to_tip(recent_syncs: &mut RecentSyncLengths) { + for _ in 0..RecentSyncLengths::MAX_RECENT_LENGTHS { + recent_syncs.push_extend_tips_length(1); + } + } + + /// Feed the given [`RecentSyncLengths`] it order to make the matching + /// [`SyncStatus`] report that it's not close to the tip. + #[cfg(test)] + pub(crate) fn sync_far_from_tip(recent_syncs: &mut RecentSyncLengths) { + for _ in 0..RecentSyncLengths::MAX_RECENT_LENGTHS { + recent_syncs.push_extend_tips_length(Self::MIN_DIST_FROM_TIP * 10); + } + } } From 5244db3464b54980eb074aee334c9f9774b0bcec Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Wed, 22 Sep 2021 17:15:10 -0300 Subject: [PATCH 2/8] Refactor enable/disable logic to use a state enum --- zebrad/src/components/inbound/tests.rs | 13 +- zebrad/src/components/mempool.rs | 229 ++++++++++++++------- zebrad/src/components/mempool/downloads.rs | 15 -- zebrad/src/components/mempool/error.rs | 2 +- zebrad/src/components/mempool/tests.rs | 95 +++++---- 5 files changed, 216 insertions(+), 138 deletions(-) diff --git a/zebrad/src/components/inbound/tests.rs b/zebrad/src/components/inbound/tests.rs index 6a12357b3e1..95ee8cf11d0 100644 --- a/zebrad/src/components/inbound/tests.rs +++ b/zebrad/src/components/inbound/tests.rs @@ -25,10 +25,9 @@ async fn mempool_requests_for_transactions() { let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none()); let address_book = Arc::new(std::sync::Mutex::new(address_book)); let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (_state_service, _latest_chain_tip, chain_tip_change) = + let (state, _latest_chain_tip, chain_tip_change) = zebra_state::init(state_config.clone(), network); - let (state, _, _) = zebra_state::init(state_config, network); let state_service = ServiceBuilder::new().buffer(1).service(state); let (block_verifier, transaction_verifier) = @@ -48,6 +47,11 @@ async fn mempool_requests_for_transactions() { chain_tip_change, ); + // Pretend we're close to tip to enable the mempool + SyncStatus::sync_close_to_tip(&mut recent_syncs); + // Wait for the mempool to make it enable itself + let _ = mempool_service.ready_and().await; + let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network); let added_transaction_ids: Vec = added_transactions.iter().map(|t| t.id).collect(); @@ -65,13 +69,10 @@ async fn mempool_requests_for_transactions() { block_verifier.clone(), )); - let r = setup_tx.send((peer_set_service, address_book, mempool)); + let r = setup_tx.send((peer_set_service, address_book, mempool.clone())); // We can't expect or unwrap because the returned Result does not implement Debug assert!(r.is_ok()); - // Pretend we're close to tip to enable the mempool - SyncStatus::sync_close_to_tip(&mut recent_syncs); - // Test `Request::MempoolTransactionIds` let request = inbound_service .clone() diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 3577a990e27..676377742cf 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -3,6 +3,7 @@ use std::{ collections::HashSet, future::Future, + iter, pin::Pin, task::{Context, Poll}, }; @@ -40,13 +41,14 @@ use self::downloads::{ use super::sync::SyncStatus; -type Outbound = Buffer, zn::Request>; -type State = Buffer, zs::Request>; -type TxVerifier = Buffer< +type OutboundService = Buffer, zn::Request>; +type StateService = Buffer, zs::Request>; +type TxVerifierService = Buffer< BoxService, transaction::Request, >; -type InboundTxDownloads = TxDownloads, Timeout, State>; +type InboundTxDownloads = + TxDownloads, Timeout, StateService>; #[derive(Debug)] #[allow(dead_code)] @@ -65,20 +67,33 @@ pub enum Response { Queued(Vec>), } +/// The state of the mempool. +/// +/// Indicates wether it is enabled or disabled and, if enabled, contains +/// the necessary data to run it. +enum State { + /// The Mempool is disabled. + Disabled, + /// The Mempool is enabled. + Enabled { + /// The Mempool storage itself. + /// + /// ##: Correctness: only components internal to the [`Mempool`] struct are allowed to + /// inject transactions into `storage`, as transactions must be verified beforehand. + storage: storage::Storage, + /// The transaction dowload and verify stream. + tx_downloads: Pin>, + }, +} + /// Mempool async management and query service. /// /// The mempool is the set of all verified transactions that this node is aware /// of that have yet to be confirmed by the Zcash network. A transaction is /// confirmed when it has been included in a block ('mined'). pub struct Mempool { - /// The Mempool storage itself. - /// - /// ##: Correctness: only components internal to the [`Mempool`] struct are allowed to - /// inject transactions into `storage`, as transactions must be verified beforehand. - storage: storage::Storage, - - /// The transaction dowload and verify stream. - tx_downloads: Pin>, + /// The state of the mempool. + state: State, /// Allows checking if we are near the tip to enable/disable the mempool. #[allow(dead_code)] @@ -88,57 +103,99 @@ pub struct Mempool { #[allow(dead_code)] chain_tip_change: ChainTipChange, - /// Indicates whether the mempool is enabled or not. - enabled: bool, + /// Handle to the outbound service. + /// Used to construct the transaction downloader. + outbound_service: OutboundService, + + /// Handle to the state service. + /// Used to construct the transaction downloader. + state_service: StateService, + + /// Handle to the transaction verifier service. + /// Used to construct the transaction downloader. + tx_verifier_service: TxVerifierService, } impl Mempool { #[allow(dead_code)] pub(crate) fn new( _network: Network, - outbound: Outbound, - state: State, - tx_verifier: TxVerifier, + outbound_service: OutboundService, + state_service: StateService, + tx_verifier_service: TxVerifierService, sync_status: SyncStatus, chain_tip_change: ChainTipChange, ) -> Self { - let tx_downloads = Box::pin(TxDownloads::new( - Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT), - Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT), - state, - )); - Mempool { - storage: Default::default(), - tx_downloads, + state: State::Disabled, sync_status, chain_tip_change, - enabled: false, + outbound_service, + state_service, + tx_verifier_service, + } + } + + /// Update the mempool state (enabled / disabled) depending on how close to + /// the tip is the synchronization, including side effects to state changes. + fn update_state(&mut self) { + // Update enabled / disabled state + let is_close_to_tip = self.sync_status.is_close_to_tip(); + if is_close_to_tip && matches!(self.state, State::Disabled) { + let tx_downloads = Box::pin(TxDownloads::new( + Timeout::new(self.outbound_service.clone(), TRANSACTION_DOWNLOAD_TIMEOUT), + Timeout::new(self.tx_verifier_service.clone(), TRANSACTION_VERIFY_TIMEOUT), + self.state_service.clone(), + )); + self.state = State::Enabled { + storage: Default::default(), + tx_downloads, + }; + } else if !is_close_to_tip && matches!(self.state, State::Enabled { .. }) { + self.state = State::Disabled + } + } + + /// Return wether the mempool is enabled or not. + #[allow(dead_code)] + pub fn enabled(&self) -> bool { + match self.state { + State::Disabled => false, + State::Enabled { .. } => true, } } /// Get the storage field of the mempool for testing purposes. #[cfg(test)] pub fn storage(&mut self) -> &mut storage::Storage { - &mut self.storage + match &mut self.state { + State::Disabled => panic!("mempool must be enabled"), + State::Enabled { storage, .. } => storage, + } } - /// Get the transaction downloader of the mempool for testing purposes. + /// Get the transaction downloader of the mempool for testing purposes. #[cfg(test)] pub fn tx_downloads(&self) -> &Pin> { - &self.tx_downloads + match &self.state { + State::Disabled => panic!("mempool must be enabled"), + State::Enabled { tx_downloads, .. } => tx_downloads, + } } /// Check if transaction should be downloaded and/or verified. /// /// If it is already in the mempool (or in its rejected list) /// then it shouldn't be downloaded/verified. - fn should_download_or_verify(&mut self, txid: UnminedTxId) -> Result<(), MempoolError> { + fn should_download_or_verify( + storage: &mut storage::Storage, + txid: UnminedTxId, + ) -> Result<(), MempoolError> { // Check if the transaction is already in the mempool. - if self.storage.contains(&txid) { + if storage.contains(&txid) { return Err(MempoolError::InMempool); } - if self.storage.contains_rejected(&txid) { + if storage.contains_rejected(&txid) { return Err(MempoolError::Rejected); } Ok(()) @@ -152,21 +209,25 @@ impl Service for Mempool { Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let is_close_to_tip = self.sync_status.is_close_to_tip(); - if self.enabled && !is_close_to_tip { - // Disable mempool - self.tx_downloads.cancel_all(); - self.enabled = false; - } else if !self.enabled && is_close_to_tip { - // Enable mempool - self.enabled = true; - } + self.update_state(); - // Clean up completed download tasks and add to mempool if successful - while let Poll::Ready(Some(r)) = self.tx_downloads.as_mut().poll_next(cx) { - if let Ok(tx) = r { - // TODO: should we do something with the result? - let _ = self.storage.insert(tx); + match &mut self.state { + State::Enabled { + storage, + tx_downloads, + } => { + // Clean up completed download tasks and add to mempool if successful + while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) { + if let Ok(tx) = r { + // TODO: should we do something with the result? + let _ = storage.insert(tx); + } + } + } + State::Disabled => { + // When the mempool is disabled we still return that the service is ready. + // Otherwise, callers could block waiting for the mempool to be enabled, + // which may not be the desired behaviour. } } Poll::Ready(Ok(())) @@ -174,34 +235,54 @@ impl Service for Mempool { #[instrument(name = "mempool", skip(self, req))] fn call(&mut self, req: Request) -> Self::Future { - if !self.enabled { - return async move { Err(MempoolError::Disabled.into()) }.boxed(); - } - match req { - Request::TransactionIds => { - let res = self.storage.tx_ids(); - async move { Ok(Response::TransactionIds(res)) }.boxed() - } - Request::TransactionsById(ids) => { - let rsp = Ok(self.storage.transactions(ids)).map(Response::Transactions); - async move { rsp }.boxed() - } - Request::RejectedTransactionIds(ids) => { - let rsp = Ok(self.storage.rejected_transactions(ids)) - .map(Response::RejectedTransactionIds); - async move { rsp }.boxed() - } - Request::Queue(gossiped_txs) => { - let rsp: Vec> = gossiped_txs - .into_iter() - .map(|gossiped_tx| { - self.should_download_or_verify(gossiped_tx.id())?; - self.tx_downloads - .download_if_needed_and_verify(gossiped_tx)?; - Ok(()) - }) - .collect(); - async move { Ok(Response::Queued(rsp)) }.boxed() + match &mut self.state { + State::Enabled { + storage, + tx_downloads, + } => match req { + Request::TransactionIds => { + let res = storage.tx_ids(); + async move { Ok(Response::TransactionIds(res)) }.boxed() + } + Request::TransactionsById(ids) => { + let rsp = Ok(storage.transactions(ids)).map(Response::Transactions); + async move { rsp }.boxed() + } + Request::RejectedTransactionIds(ids) => { + let rsp = Ok(storage.rejected_transactions(ids)) + .map(Response::RejectedTransactionIds); + async move { rsp }.boxed() + } + Request::Queue(gossiped_txs) => { + let rsp: Vec> = gossiped_txs + .into_iter() + .map(|gossiped_tx| { + Self::should_download_or_verify(storage, gossiped_tx.id())?; + tx_downloads.download_if_needed_and_verify(gossiped_tx)?; + Ok(()) + }) + .collect(); + async move { Ok(Response::Queued(rsp)) }.boxed() + } + }, + State::Disabled => { + // We can't return an error since that will cause a disconnection + // by the peer connection handler. Therefore, return successful + // empty responses. + let resp = match req { + Request::TransactionIds => Response::TransactionIds(Default::default()), + Request::TransactionsById(_) => Response::Transactions(Default::default()), + Request::RejectedTransactionIds(_) => { + Response::RejectedTransactionIds(Default::default()) + } + // Special case; we can signal the error inside the response. + Request::Queue(gossiped_txs) => Response::Queued( + iter::repeat(Err(MempoolError::Disabled)) + .take(gossiped_txs.len()) + .collect(), + ), + }; + async move { Ok(resp) }.boxed() } } } diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 4fd052b5901..44636988543 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -315,21 +315,6 @@ where Ok(()) } - /// Cancel all running tasks and reset the downloader state. - // Note: copied from zebrad/src/components/sync/downloads.rs - pub fn cancel_all(&mut self) { - // Replace the pending task list with an empty one and drop it. - let _ = std::mem::take(&mut self.pending); - // Signal cancellation to all running tasks. - // Since we already dropped the JoinHandles above, they should - // fail silently. - for (_hash, cancel) in self.cancel_handles.drain() { - let _ = cancel.send(()); - } - assert!(self.pending.is_empty()); - assert!(self.cancel_handles.is_empty()); - } - /// Get the number of currently in-flight download tasks. // Note: copied from zebrad/src/components/sync/downloads.rs #[allow(dead_code)] diff --git a/zebrad/src/components/mempool/error.rs b/zebrad/src/components/mempool/error.rs index 12598e53a34..992c891d789 100644 --- a/zebrad/src/components/mempool/error.rs +++ b/zebrad/src/components/mempool/error.rs @@ -44,6 +44,6 @@ pub enum MempoolError { #[error("transaction dropped because the queue is full")] FullQueue, - #[error("mempool is disabled since synchronization did not reach the tip")] + #[error("mempool is disabled since synchronization is behind the chain tip")] Disabled, } diff --git a/zebrad/src/components/mempool/tests.rs b/zebrad/src/components/mempool/tests.rs index ef4b1a9cf01..507769517b0 100644 --- a/zebrad/src/components/mempool/tests.rs +++ b/zebrad/src/components/mempool/tests.rs @@ -16,10 +16,8 @@ async fn mempool_service_basic() -> Result<(), Report> { let state_config = StateConfig::ephemeral(); let peer_set = MockService::build().for_unit_tests(); let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (_state_service, _latest_chain_tip, chain_tip_change) = - zebra_state::init(state_config.clone(), network); - let (state, _, _) = zebra_state::init(state_config, network); + let (state, _latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network); let state_service = ServiceBuilder::new().buffer(1).service(state); let (_chain_verifier, tx_verifier) = zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) @@ -36,11 +34,16 @@ async fn mempool_service_basic() -> Result<(), Report> { sync_status, chain_tip_change, ); - // Insert the genesis block coinbase transaction into the mempool storage. - service.storage.insert(genesis_transactions.1[0].clone())?; // Pretend we're close to tip to enable the mempool SyncStatus::sync_close_to_tip(&mut recent_syncs); + // Wait for the mempool to make it enable itself + let _ = service.ready_and().await; + + // Insert the genesis block coinbase transaction into the mempool storage. + service + .storage() + .insert(genesis_transactions.1[0].clone())?; // Test `Request::TransactionIds` let response = service @@ -83,7 +86,7 @@ async fn mempool_service_basic() -> Result<(), Report> { let (count, more_transactions) = unmined_transactions_in_blocks(10, network); // Skip the first (used before) and the last (will be used later) for tx in more_transactions.iter().skip(1).take(count - 2) { - service.storage.insert(tx.clone())?; + service.storage().insert(tx.clone())?; } // Test `Request::RejectedTransactionIds` @@ -133,7 +136,7 @@ async fn mempool_service_disabled() -> Result<(), Report> { let peer_set = MockService::build().for_unit_tests(); let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, _, chain_tip_change) = zebra_state::init(state_config, network); + let (state, _latest_chain_tip, chain_tip_change) = zebra_state::init(state_config, network); let state_service = ServiceBuilder::new().buffer(1).service(state); let (_chain_verifier, tx_verifier) = zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) @@ -150,27 +153,21 @@ async fn mempool_service_disabled() -> Result<(), Report> { sync_status, chain_tip_change, ); - // Insert the genesis block coinbase transaction into the mempool storage. - service.storage.insert(genesis_transactions.1[0].clone())?; // Test if mempool is disabled (it should start disabled) - let response = service - .ready_and() - .await - .unwrap() - .call(Request::TransactionIds) - .await; - assert_eq!( - *response - .expect_err("mempool should return an error") - .downcast_ref::() - .expect("error must be MempoolError"), - MempoolError::Disabled, - "error must be MempoolError::Disabled" - ); + assert!(!service.enabled()); // Pretend we're close to tip to enable the mempool SyncStatus::sync_close_to_tip(&mut recent_syncs); + // Wait for the mempool to make it enable itself + let _ = service.ready_and().await; + + assert!(service.enabled()); + + // Insert the genesis block coinbase transaction into the mempool storage. + service + .storage() + .insert(genesis_transactions.1[0].clone())?; // Test if the mempool answers correctly (i.e. is enabled) let response = service @@ -203,36 +200,50 @@ async fn mempool_service_disabled() -> Result<(), Report> { }; assert_eq!(queued_responses.len(), 1); assert!(queued_responses[0].is_ok()); - assert_eq!( - service.tx_downloads().in_flight(), - 1, - "Transaction must be queued for download" - ); + assert_eq!(service.tx_downloads().in_flight(), 1); // Pretend we're far from the tip to disable the mempool SyncStatus::sync_far_from_tip(&mut recent_syncs); + // Wait for the mempool to make it disable itself + let _ = service.ready_and().await; // Test if mempool is disabled again + assert!(!service.enabled()); + + // Test if the mempool returns no transactions when disabled let response = service .ready_and() .await .unwrap() .call(Request::TransactionIds) - .await; - assert_eq!( - *response - .expect_err("mempool should return an error") - .downcast_ref::() - .expect("error must be MempoolError"), - MempoolError::Disabled, - "error must be MempoolError::Disabled" - ); + .await + .unwrap(); + match response { + Response::TransactionIds(ids) => { + assert_eq!( + ids.len(), + 0, + "mempool should return no transactions when disabled" + ) + } + _ => unreachable!("will never happen in this test"), + }; - assert_eq!( - service.tx_downloads().in_flight(), - 0, - "Transaction download should have been cancelled" - ); + // Test if the mempool returns to Queue requests correctly when disabled + let txid = more_transactions.last().unwrap().id; + let response = service + .ready_and() + .await + .unwrap() + .call(Request::Queue(vec![txid.into()])) + .await + .unwrap(); + let queued_responses = match response { + Response::Queued(queue_responses) => queue_responses, + _ => unreachable!("will never happen in this test"), + }; + assert_eq!(queued_responses.len(), 1); + assert_eq!(queued_responses[0], Err(MempoolError::Disabled)); Ok(()) } From de0364f3848cb0f23dcee992e371436c5a1238bb Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Wed, 22 Sep 2021 18:14:59 -0300 Subject: [PATCH 3/8] Add helper test functions to enable/disable the mempool --- zebrad/src/components/inbound/tests.rs | 6 ++---- zebrad/src/components/mempool.rs | 22 ++++++++++++++++++++++ zebrad/src/components/mempool/tests.rs | 18 ++++++------------ zebrad/src/components/sync.rs | 2 +- 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/zebrad/src/components/inbound/tests.rs b/zebrad/src/components/inbound/tests.rs index 95ee8cf11d0..222633cebc6 100644 --- a/zebrad/src/components/inbound/tests.rs +++ b/zebrad/src/components/inbound/tests.rs @@ -47,10 +47,8 @@ async fn mempool_requests_for_transactions() { chain_tip_change, ); - // Pretend we're close to tip to enable the mempool - SyncStatus::sync_close_to_tip(&mut recent_syncs); - // Wait for the mempool to make it enable itself - let _ = mempool_service.ready_and().await; + // Enable the mempool + let _ = mempool_service.enable(&mut recent_syncs).await; let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network); let added_transaction_ids: Vec = added_transactions.iter().map(|t| t.id).collect(); diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 676377742cf..ace1e8ef991 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -39,6 +39,8 @@ use self::downloads::{ Downloads as TxDownloads, Gossip, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT, }; +#[cfg(test)] +use super::sync::RecentSyncLengths; use super::sync::SyncStatus; type OutboundService = Buffer, zn::Request>; @@ -183,6 +185,26 @@ impl Mempool { } } + /// Enable the mempool by pretending the synchronization is close to the tip. + #[cfg(test)] + pub async fn enable(&mut self, recent_syncs: &mut RecentSyncLengths) { + use tower::ServiceExt; + // Pretend we're close to tip + SyncStatus::sync_close_to_tip(recent_syncs); + // Wait for the mempool to make it enable itself + let _ = self.ready_and().await; + } + + /// Disable the mempool by pretending the synchronization is far from the tip. + #[cfg(test)] + pub async fn disable(&mut self, recent_syncs: &mut RecentSyncLengths) { + use tower::ServiceExt; + // Pretend we're far from the tip + SyncStatus::sync_far_from_tip(recent_syncs); + // Wait for the mempool to make it enable itself + let _ = self.ready_and().await; + } + /// Check if transaction should be downloaded and/or verified. /// /// If it is already in the mempool (or in its rejected list) diff --git a/zebrad/src/components/mempool/tests.rs b/zebrad/src/components/mempool/tests.rs index 507769517b0..9d6b5ee9ce4 100644 --- a/zebrad/src/components/mempool/tests.rs +++ b/zebrad/src/components/mempool/tests.rs @@ -35,10 +35,8 @@ async fn mempool_service_basic() -> Result<(), Report> { chain_tip_change, ); - // Pretend we're close to tip to enable the mempool - SyncStatus::sync_close_to_tip(&mut recent_syncs); - // Wait for the mempool to make it enable itself - let _ = service.ready_and().await; + // Enable the mempool + let _ = service.enable(&mut recent_syncs).await; // Insert the genesis block coinbase transaction into the mempool storage. service @@ -157,10 +155,8 @@ async fn mempool_service_disabled() -> Result<(), Report> { // Test if mempool is disabled (it should start disabled) assert!(!service.enabled()); - // Pretend we're close to tip to enable the mempool - SyncStatus::sync_close_to_tip(&mut recent_syncs); - // Wait for the mempool to make it enable itself - let _ = service.ready_and().await; + // Enable the mempool + let _ = service.enable(&mut recent_syncs).await; assert!(service.enabled()); @@ -202,10 +198,8 @@ async fn mempool_service_disabled() -> Result<(), Report> { assert!(queued_responses[0].is_ok()); assert_eq!(service.tx_downloads().in_flight(), 1); - // Pretend we're far from the tip to disable the mempool - SyncStatus::sync_far_from_tip(&mut recent_syncs); - // Wait for the mempool to make it disable itself - let _ = service.ready_and().await; + // Disable the mempool + let _ = service.disable(&mut recent_syncs).await; // Test if mempool is disabled again assert!(!service.enabled()); diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 67f04757d67..435d1ea0a46 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -28,7 +28,7 @@ mod status; mod tests; use downloads::{AlwaysHedge, Downloads}; -use recent_sync_lengths::RecentSyncLengths; +pub use recent_sync_lengths::RecentSyncLengths; pub use status::SyncStatus; /// Controls the number of peers used for each ObtainTips and ExtendTips request. From 976c19ead5ddbb03e16506b82508143d684ddeaf Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Wed, 22 Sep 2021 18:19:51 -0300 Subject: [PATCH 4/8] Add documentation about errors on service calls --- zebrad/src/components/inbound.rs | 4 ++++ zebrad/src/components/mempool.rs | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index f68d3a5b5d3..608db652906 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -237,6 +237,10 @@ impl Service for Inbound { Poll::Ready(result) } + /// Call the inbound service. + /// + /// Errors indicate that the peer has done something wrong or unexpected, + /// and will cause callers to disconnect from the remote peer. #[instrument(name = "inbound", skip(self, req))] fn call(&mut self, req: zn::Request) -> Self::Future { match req { diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index ace1e8ef991..1db4d301d0c 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -255,6 +255,10 @@ impl Service for Mempool { Poll::Ready(Ok(())) } + /// Call the mempool service. + /// + /// Errors indicate that the peer has done something wrong or unexpected, + /// and will cause callers to disconnect from the remote peer. #[instrument(name = "mempool", skip(self, req))] fn call(&mut self, req: Request) -> Self::Future { match &mut self.state { From fe354f51eaea9b5a5bd141d35c4f8c1212dbcfcd Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Thu, 23 Sep 2021 17:09:08 -0300 Subject: [PATCH 5/8] Improvements from review --- zebrad/src/components/mempool.rs | 90 ++++++++++++++------------ zebrad/src/components/mempool/tests.rs | 6 +- 2 files changed, 50 insertions(+), 46 deletions(-) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 5911921a2d2..14dde786f6f 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -43,14 +43,13 @@ use self::downloads::{ use super::sync::RecentSyncLengths; use super::sync::SyncStatus; -type OutboundService = Buffer, zn::Request>; -type StateService = Buffer, zs::Request>; -type TxVerifierService = Buffer< +type Outbound = Buffer, zn::Request>; +type State = Buffer, zs::Request>; +type TxVerifier = Buffer< BoxService, transaction::Request, >; -type InboundTxDownloads = - TxDownloads, Timeout, StateService>; +type InboundTxDownloads = TxDownloads, Timeout, State>; #[derive(Debug)] #[allow(dead_code)] @@ -73,7 +72,7 @@ pub enum Response { /// /// Indicates wether it is enabled or disabled and, if enabled, contains /// the necessary data to run it. -enum State { +enum ActiveState { /// The Mempool is disabled. Disabled, /// The Mempool is enabled. @@ -95,7 +94,7 @@ enum State { /// confirmed when it has been included in a block ('mined'). pub struct Mempool { /// The state of the mempool. - state: State, + active_state: ActiveState, /// Allows checking if we are near the tip to enable/disable the mempool. #[allow(dead_code)] @@ -107,81 +106,86 @@ pub struct Mempool { /// Handle to the outbound service. /// Used to construct the transaction downloader. - outbound_service: OutboundService, + outbound: Outbound, /// Handle to the state service. /// Used to construct the transaction downloader. - state_service: StateService, + state: State, /// Handle to the transaction verifier service. /// Used to construct the transaction downloader. - tx_verifier_service: TxVerifierService, + tx_verifier: TxVerifier, } impl Mempool { #[allow(dead_code)] pub(crate) fn new( _network: Network, - outbound_service: OutboundService, - state_service: StateService, - tx_verifier_service: TxVerifierService, + outbound: Outbound, + state: State, + tx_verifier: TxVerifier, sync_status: SyncStatus, chain_tip_change: ChainTipChange, ) -> Self { Mempool { - state: State::Disabled, + active_state: ActiveState::Disabled, sync_status, chain_tip_change, - outbound_service, - state_service, - tx_verifier_service, + outbound, + state, + tx_verifier, } } /// Update the mempool state (enabled / disabled) depending on how close to /// the tip is the synchronization, including side effects to state changes. fn update_state(&mut self) { - // Update enabled / disabled state let is_close_to_tip = self.sync_status.is_close_to_tip(); - if is_close_to_tip && matches!(self.state, State::Disabled) { + if self.is_enabled() == is_close_to_tip { + // the active state is up to date + return; + } + + // Update enabled / disabled state + if is_close_to_tip { let tx_downloads = Box::pin(TxDownloads::new( - Timeout::new(self.outbound_service.clone(), TRANSACTION_DOWNLOAD_TIMEOUT), - Timeout::new(self.tx_verifier_service.clone(), TRANSACTION_VERIFY_TIMEOUT), - self.state_service.clone(), + Timeout::new(self.outbound.clone(), TRANSACTION_DOWNLOAD_TIMEOUT), + Timeout::new(self.tx_verifier.clone(), TRANSACTION_VERIFY_TIMEOUT), + self.state.clone(), )); - self.state = State::Enabled { + self.active_state = ActiveState::Enabled { storage: Default::default(), tx_downloads, }; - } else if !is_close_to_tip && matches!(self.state, State::Enabled { .. }) { - self.state = State::Disabled + } else { + self.active_state = ActiveState::Disabled } } /// Return wether the mempool is enabled or not. #[allow(dead_code)] - pub fn enabled(&self) -> bool { - match self.state { - State::Disabled => false, - State::Enabled { .. } => true, + pub fn is_enabled(&self) -> bool { + match self.active_state { + ActiveState::Disabled => false, + ActiveState::Enabled { .. } => true, } } /// Get the storage field of the mempool for testing purposes. #[cfg(test)] pub fn storage(&mut self) -> &mut storage::Storage { - match &mut self.state { - State::Disabled => panic!("mempool must be enabled"), - State::Enabled { storage, .. } => storage, + match &mut self.active_state { + ActiveState::Disabled => panic!("mempool must be enabled"), + ActiveState::Enabled { storage, .. } => storage, } } /// Get the transaction downloader of the mempool for testing purposes. #[cfg(test)] pub fn tx_downloads(&self) -> &Pin> { - match &self.state { - State::Disabled => panic!("mempool must be enabled"), - State::Enabled { tx_downloads, .. } => tx_downloads, + match &self.active_state { + ActiveState::Disabled => panic!("mempool must be enabled"), + ActiveState::Enabled { tx_downloads, .. } => tx_downloads, } } @@ -192,7 +196,7 @@ impl Mempool { // Pretend we're close to tip SyncStatus::sync_close_to_tip(recent_syncs); // Wait for the mempool to make it enable itself - let _ = self.ready_and().await; + let _ = self.oneshot(Request::TransactionIds).await; } /// Disable the mempool by pretending the synchronization is far from the tip. @@ -202,7 +206,7 @@ impl Mempool { // Pretend we're far from the tip SyncStatus::sync_far_from_tip(recent_syncs); // Wait for the mempool to make it enable itself - let _ = self.ready_and().await; + let _ = self.oneshot(Request::TransactionIds).await; } /// Check if transaction should be downloaded and/or verified. @@ -233,8 +237,8 @@ impl Service for Mempool { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.update_state(); - match &mut self.state { - State::Enabled { + match &mut self.active_state { + ActiveState::Enabled { storage, tx_downloads, } => { @@ -251,7 +255,7 @@ impl Service for Mempool { } } } - State::Disabled => { + ActiveState::Disabled => { // When the mempool is disabled we still return that the service is ready. // Otherwise, callers could block waiting for the mempool to be enabled, // which may not be the desired behaviour. @@ -266,8 +270,8 @@ impl Service for Mempool { /// and will cause callers to disconnect from the remote peer. #[instrument(name = "mempool", skip(self, req))] fn call(&mut self, req: Request) -> Self::Future { - match &mut self.state { - State::Enabled { + match &mut self.active_state { + ActiveState::Enabled { storage, tx_downloads, } => match req { @@ -296,7 +300,7 @@ impl Service for Mempool { async move { Ok(Response::Queued(rsp)) }.boxed() } }, - State::Disabled => { + ActiveState::Disabled => { // We can't return an error since that will cause a disconnection // by the peer connection handler. Therefore, return successful // empty responses. diff --git a/zebrad/src/components/mempool/tests.rs b/zebrad/src/components/mempool/tests.rs index 6232eebf45e..cbf14009611 100644 --- a/zebrad/src/components/mempool/tests.rs +++ b/zebrad/src/components/mempool/tests.rs @@ -160,12 +160,12 @@ async fn mempool_service_disabled() -> Result<(), Report> { ); // Test if mempool is disabled (it should start disabled) - assert!(!service.enabled()); + assert!(!service.is_enabled()); // Enable the mempool let _ = service.enable(&mut recent_syncs).await; - assert!(service.enabled()); + assert!(service.is_enabled()); // Insert the genesis block coinbase transaction into the mempool storage. service.storage().insert(genesis_transaction.clone())?; @@ -205,7 +205,7 @@ async fn mempool_service_disabled() -> Result<(), Report> { let _ = service.disable(&mut recent_syncs).await; // Test if mempool is disabled again - assert!(!service.enabled()); + assert!(!service.is_enabled()); // Test if the mempool returns no transactions when disabled let response = service From 397b08e4a40545153f7788aad3ce67e68732ba64 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Thu, 23 Sep 2021 17:11:36 -0300 Subject: [PATCH 6/8] Improve documentation --- zebrad/src/components/mempool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 14dde786f6f..03934f95620 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -195,7 +195,7 @@ impl Mempool { use tower::ServiceExt; // Pretend we're close to tip SyncStatus::sync_close_to_tip(recent_syncs); - // Wait for the mempool to make it enable itself + // Make a dummy request to poll the mempool and make it enable itself let _ = self.oneshot(Request::TransactionIds).await; } @@ -205,7 +205,7 @@ impl Mempool { use tower::ServiceExt; // Pretend we're far from the tip SyncStatus::sync_far_from_tip(recent_syncs); - // Wait for the mempool to make it enable itself + // Make a dummy request to poll the mempool and make it disable itself let _ = self.oneshot(Request::TransactionIds).await; } From f20c46a548dccf011d6a0b87343611ad80c0cb3a Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Fri, 24 Sep 2021 16:07:45 -0300 Subject: [PATCH 7/8] Fix bug in test --- zebrad/src/components/inbound/tests.rs | 47 +++++++++++++++----------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/zebrad/src/components/inbound/tests.rs b/zebrad/src/components/inbound/tests.rs index a66086a1f81..41169df2cd8 100644 --- a/zebrad/src/components/inbound/tests.rs +++ b/zebrad/src/components/inbound/tests.rs @@ -6,7 +6,8 @@ use crate::components::sync::SyncStatus; use futures::FutureExt; use tokio::sync::oneshot; use tower::{ - buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, ServiceExt, + buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, Service, + ServiceExt, }; use tracing::Span; @@ -34,15 +35,16 @@ async fn mempool_requests_for_transactions() { .collect(); // Test `Request::MempoolTransactionIds` - let request = inbound_service + let response = inbound_service .clone() .oneshot(Request::MempoolTransactionIds) .await; - match request { + match response { Ok(Response::TransactionIds(response)) => assert_eq!(response, added_transaction_ids), - _ => unreachable!( - "`MempoolTransactionIds` requests should always respond `Ok(Vec)`" - ), + _ => unreachable!(format!( + "`MempoolTransactionIds` requests should always respond `Ok(Vec)`, got {:?}", + response + )), }; // Test `Request::TransactionsById` @@ -51,11 +53,11 @@ async fn mempool_requests_for_transactions() { .copied() .collect::>(); - let request = inbound_service + let response = inbound_service .oneshot(Request::TransactionsById(hash_set)) .await; - match request { + match response { Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions.unwrap()), _ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec)`"), }; @@ -188,7 +190,7 @@ async fn setup( let (state, _latest_chain_tip, chain_tip_change) = zebra_state::init(state_config.clone(), network); - let state_service = ServiceBuilder::new().buffer(1).service(state); + let mut state_service = ServiceBuilder::new().buffer(1).service(state); let (block_verifier, _transaction_verifier) = zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone()) @@ -200,6 +202,22 @@ async fn setup( let mock_tx_verifier = MockService::build().for_unit_tests(); let buffered_tx_verifier = Buffer::new(BoxService::new(mock_tx_verifier.clone()), 10); + // Push the genesis block to the state. + // This must be done before creating the mempool to avoid `chain_tip_change` + // returning "reset" which would clear the mempool. + let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES + .zcash_deserialize_into() + .unwrap(); + state_service + .ready_and() + .await + .unwrap() + .call(zebra_state::Request::CommitFinalizedBlock( + genesis_block.clone().into(), + )) + .await + .unwrap(); + let mut mempool_service = Mempool::new( network, buffered_peer_set.clone(), @@ -235,17 +253,6 @@ async fn setup( // We can't expect or unwrap because the returned Result does not implement Debug assert!(r.is_ok()); - // Push the genesis block to the state - let genesis_block: Arc = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES - .zcash_deserialize_into() - .unwrap(); - state_service - .oneshot(zebra_state::Request::CommitFinalizedBlock( - genesis_block.clone().into(), - )) - .await - .unwrap(); - ( inbound_service, added_transactions, From e2fd12b0f9a79da08056bfb69992af5ee1ffd385 Mon Sep 17 00:00:00 2001 From: Conrado Gouvea Date: Tue, 28 Sep 2021 13:50:18 -0300 Subject: [PATCH 8/8] Apply suggestions from code review Co-authored-by: teor --- zebrad/src/components/mempool.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 03934f95620..0787e1d7cca 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -162,8 +162,7 @@ impl Mempool { } } - /// Return wether the mempool is enabled or not. - #[allow(dead_code)] + /// Return whether the mempool is enabled or not. pub fn is_enabled(&self) -> bool { match self.active_state { ActiveState::Disabled => false, @@ -250,7 +249,8 @@ impl Service for Mempool { // Clean up completed download tasks and add to mempool if successful while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) { if let Ok(tx) = r { - // TODO: should we do something with the result? + // Storage handles conflicting transactions or a full mempool internally, + // so just ignore the storage result here let _ = storage.insert(tx); } }