Skip to content

Commit

Permalink
Cancel download and verify tasks when the mempool is deactivated
Browse files Browse the repository at this point in the history
  • Loading branch information
conradoplg committed Sep 16, 2021
1 parent 957e12e commit f6ef6dc
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 5 deletions.
5 changes: 4 additions & 1 deletion zebrad/src/components/inbound/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn mempool_requests_for_transactions() {
let (peer_set, _) = mock_peer_set();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let address_book = Arc::new(std::sync::Mutex::new(address_book));
let (sync_status, _recent_syncs) = SyncStatus::new();
let (sync_status, mut recent_syncs) = SyncStatus::new();

let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
Expand Down Expand Up @@ -61,6 +61,9 @@ async fn mempool_requests_for_transactions() {
// We can't expect or unwrap because the returned Result does not implement Debug
assert!(r.is_ok());

// Pretend we're close to tip to enable the mempool
SyncStatus::sync_close_to_tip(&mut recent_syncs);

// Test `Request::MempoolTransactionIds`
let request = inbound_service
.clone()
Expand Down
23 changes: 23 additions & 0 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ pub struct Mempool {
/// Allows checking if we are near the tip to enable/disable the mempool.
#[allow(dead_code)]
sync_status: SyncStatus,

/// Indicates whether the mempool is enabled or not.
enabled: bool,
}

impl Mempool {
Expand All @@ -102,6 +105,7 @@ impl Mempool {
storage: Default::default(),
tx_downloads,
sync_status,
enabled: false,
}
}

Expand All @@ -111,6 +115,12 @@ impl Mempool {
&mut self.storage
}

/// Get the transaction downloader of the mempool for testing purposes.
#[cfg(test)]
pub fn tx_downloads(&self) -> &Pin<Box<InboundTxDownloads>> {
&self.tx_downloads
}

/// Check if transaction should be downloaded and/or verified.
///
/// If it is already in the mempool (or in its rejected list)
Expand All @@ -134,6 +144,16 @@ impl Service<Request> for Mempool {
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let is_close_to_tip = self.sync_status.is_close_to_tip();
if self.enabled && !is_close_to_tip {
// Disable mempool
self.tx_downloads.cancel_all();
self.enabled = false;
} else if !self.enabled && is_close_to_tip {
// Enable mempool
self.enabled = true;
}

// Clean up completed download tasks and add to mempool if successful
while let Poll::Ready(Some(r)) = self.tx_downloads.as_mut().poll_next(cx) {
if let Ok(tx) = r {
Expand All @@ -146,6 +166,9 @@ impl Service<Request> for Mempool {

#[instrument(name = "mempool", skip(self, req))]
fn call(&mut self, req: Request) -> Self::Future {
if !self.enabled {
return async move { Err(MempoolError::Disabled.into()) }.boxed();
}
match req {
Request::TransactionIds => {
let res = self.storage.tx_ids();
Expand Down
22 changes: 22 additions & 0 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,28 @@ where
Ok(())
}

/// Cancel all running tasks and reset the downloader state.
// Note: copied from zebrad/src/components/sync/downloads.rs
pub fn cancel_all(&mut self) {
// Replace the pending task list with an empty one and drop it.
let _ = std::mem::take(&mut self.pending);
// Signal cancellation to all running tasks.
// Since we already dropped the JoinHandles above, they should
// fail silently.
for (_hash, cancel) in self.cancel_handles.drain() {
let _ = cancel.send(());
}
assert!(self.pending.is_empty());
assert!(self.cancel_handles.is_empty());
}

/// Get the number of currently in-flight download tasks.
// Note: copied from zebrad/src/components/sync/downloads.rs
#[allow(dead_code)]
pub fn in_flight(&self) -> usize {
self.pending.len()
}

/// Check if transaction is already in the state.
async fn transaction_in_state(state: &mut ZS, txid: UnminedTxId) -> Result<(), BoxError> {
// Check if the transaction is already in the state.
Expand Down
3 changes: 3 additions & 0 deletions zebrad/src/components/mempool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ pub enum MempoolError {
/// The queue's capacity is [`super::downloads::MAX_INBOUND_CONCURRENCY`].
#[error("transaction dropped because the queue is full")]
FullQueue,

#[error("mempool is disabled since synchronization did not reach the tip")]
Disabled,
}
145 changes: 141 additions & 4 deletions zebrad/src/components/mempool/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn mempool_service_basic() -> Result<(), Report> {
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let (peer_set, _) = mock_peer_set();
let (sync_status, _recent_syncs) = SyncStatus::new();
let (sync_status, mut recent_syncs) = SyncStatus::new();

let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
Expand All @@ -37,6 +37,9 @@ async fn mempool_service_basic() -> Result<(), Report> {
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage.insert(genesis_transactions.1[0].clone())?;

// Pretend we're close to tip to enable the mempool
SyncStatus::sync_close_to_tip(&mut recent_syncs);

// Test `Request::TransactionIds`
let response = service
.ready_and()
Expand Down Expand Up @@ -75,14 +78,18 @@ async fn mempool_service_basic() -> Result<(), Report> {

// Insert more transactions into the mempool storage.
// This will cause the genesis transaction to be moved into rejected.
let more_transactions = unmined_transactions_in_blocks(10, network);
for tx in more_transactions.1.iter().skip(1) {
let (count, more_transactions) = unmined_transactions_in_blocks(10, network);
// Skip the first (used before) and the last (will be used later)
for tx in more_transactions.iter().skip(1).take(count - 2) {
service.storage.insert(tx.clone())?;
}

// Test `Request::RejectedTransactionIds`
let response = service
.oneshot(Request::RejectedTransactionIds(
.ready_and()
.await
.unwrap()
.call(Request::RejectedTransactionIds(
genesis_transactions_hash_set,
))
.await
Expand All @@ -94,5 +101,135 @@ async fn mempool_service_basic() -> Result<(), Report> {

assert_eq!(rejected_ids, genesis_transaction_ids);

// Test `Request::Queue`
// Use the ID of the last transaction in the list
let txid = more_transactions.last().unwrap().id;
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(service.tx_downloads().in_flight(), 1);

Ok(())
}

#[tokio::test]
async fn mempool_service_disabled() -> Result<(), Report> {
// Using the mainnet for now
let network = Network::Mainnet;
let consensus_config = ConsensusConfig::default();
let state_config = StateConfig::ephemeral();
let (peer_set, _) = mock_peer_set();
let (sync_status, mut recent_syncs) = SyncStatus::new();

let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let (_chain_verifier, tx_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;

// get the genesis block transactions from the Zcash blockchain.
let genesis_transactions = unmined_transactions_in_blocks(0, network);
// Start the mempool service
let mut service = Mempool::new(
network,
peer_set,
state_service.clone(),
tx_verifier,
sync_status,
);
// Insert the genesis block coinbase transaction into the mempool storage.
service.storage.insert(genesis_transactions.1[0].clone())?;

// Test if mempool is disabled (it should start disabled)
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await;
assert_eq!(
*response
.expect_err("mempool should return an error")
.downcast_ref::<MempoolError>()
.expect("error must be MempoolError"),
MempoolError::Disabled,
"error must be MempoolError::Disabled"
);

// Pretend we're close to tip to enable the mempool
SyncStatus::sync_close_to_tip(&mut recent_syncs);

// Test if the mempool answers correctly (i.e. is enabled)
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
let _genesis_transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};

let (_count, more_transactions) = unmined_transactions_in_blocks(1, network);

// Queue a transaction for download
// Use the ID of the last transaction in the list
let txid = more_transactions.last().unwrap().id;
let response = service
.ready_and()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(
service.tx_downloads().in_flight(),
1,
"Transaction must be queued for download"
);

// Pretend we're far from the tip to disable the mempool
SyncStatus::sync_far_from_tip(&mut recent_syncs);

// Test if mempool is disabled again
let response = service
.ready_and()
.await
.unwrap()
.call(Request::TransactionIds)
.await;
assert_eq!(
*response
.expect_err("mempool should return an error")
.downcast_ref::<MempoolError>()
.expect("error must be MempoolError"),
MempoolError::Disabled,
"error must be MempoolError::Disabled"
);

assert_eq!(
service.tx_downloads().in_flight(),
0,
"Transaction download should have been cancelled"
);

Ok(())
}
18 changes: 18 additions & 0 deletions zebrad/src/components/sync/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,22 @@ impl SyncStatus {
// average sync length falls below the threshold.
avg < Self::MIN_DIST_FROM_TIP
}

/// Feed the given [`RecentSyncLengths`] it order to make the matching
/// [`SyncStatus`] report that it's close to the tip.
#[cfg(test)]
pub(crate) fn sync_close_to_tip(recent_syncs: &mut RecentSyncLengths) {
for _ in 0..RecentSyncLengths::MAX_RECENT_LENGTHS {
recent_syncs.push_extend_tips_length(1);
}
}

/// Feed the given [`RecentSyncLengths`] it order to make the matching
/// [`SyncStatus`] report that it's not close to the tip.
#[cfg(test)]
pub(crate) fn sync_far_from_tip(recent_syncs: &mut RecentSyncLengths) {
for _ in 0..RecentSyncLengths::MAX_RECENT_LENGTHS {
recent_syncs.push_extend_tips_length(Self::MIN_DIST_FROM_TIP * 10);
}
}
}

0 comments on commit f6ef6dc

Please sign in to comment.