Skip to content

Commit

Permalink
Cancel download and verify tasks when the mempool is deactivated (#2764)
Browse files Browse the repository at this point in the history
* Cancel download and verify tasks when the mempool is deactivated

* Refactor enable/disable logic to use a state enum

* Add helper test functions to enable/disable the mempool

* Add documentation about errors on service calls

* Improvements from review

* Improve documentation

* Fix bug in test

* Apply suggestions from code review

Co-authored-by: teor <[email protected]>

Co-authored-by: teor <[email protected]>
  • Loading branch information
conradoplg and teor2345 authored Sep 28, 2021
1 parent 1601c9f commit c6878d9
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 88 deletions.
4 changes: 4 additions & 0 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ impl Service<zn::Request> for Inbound {
Poll::Ready(result)
}

/// Call the inbound service.
///
/// Errors indicate that the peer has done something wrong or unexpected,
/// and will cause callers to disconnect from the remote peer.
#[instrument(name = "inbound", skip(self, req))]
fn call(&mut self, req: zn::Request) -> Self::Future {
match req {
Expand Down
55 changes: 32 additions & 23 deletions zebrad/src/components/inbound/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use crate::components::sync::SyncStatus;
use futures::FutureExt;
use tokio::sync::oneshot;
use tower::{
buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, ServiceExt,
buffer::Buffer, builder::ServiceBuilder, load_shed::LoadShed, util::BoxService, Service,
ServiceExt,
};

use tracing::Span;
Expand Down Expand Up @@ -34,15 +35,16 @@ async fn mempool_requests_for_transactions() {
.collect();

// Test `Request::MempoolTransactionIds`
let request = inbound_service
let response = inbound_service
.clone()
.oneshot(Request::MempoolTransactionIds)
.await;
match request {
match response {
Ok(Response::TransactionIds(response)) => assert_eq!(response, added_transaction_ids),
_ => unreachable!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`"
),
_ => unreachable!(format!(
"`MempoolTransactionIds` requests should always respond `Ok(Vec<UnminedTxId>)`, got {:?}",
response
)),
};

// Test `Request::TransactionsById`
Expand All @@ -51,11 +53,11 @@ async fn mempool_requests_for_transactions() {
.copied()
.collect::<HashSet<_>>();

let request = inbound_service
let response = inbound_service
.oneshot(Request::TransactionsById(hash_set))
.await;

match request {
match response {
Ok(Response::Transactions(response)) => assert_eq!(response, added_transactions.unwrap()),
_ => unreachable!("`TransactionsById` requests should always respond `Ok(Vec<UnminedTx>)`"),
};
Expand Down Expand Up @@ -184,12 +186,11 @@ async fn setup(
let state_config = StateConfig::ephemeral();
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let address_book = Arc::new(std::sync::Mutex::new(address_book));
let (sync_status, _recent_syncs) = SyncStatus::new();
let (_state_service, _latest_chain_tip, chain_tip_change) =
let (sync_status, mut recent_syncs) = SyncStatus::new();
let (state, _latest_chain_tip, chain_tip_change) =
zebra_state::init(state_config.clone(), network);

let (state, _, _) = zebra_state::init(state_config, network);
let state_service = ServiceBuilder::new().buffer(1).service(state);
let mut state_service = ServiceBuilder::new().buffer(1).service(state);

let (block_verifier, _transaction_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
Expand All @@ -201,6 +202,22 @@ async fn setup(
let mock_tx_verifier = MockService::build().for_unit_tests();
let buffered_tx_verifier = Buffer::new(BoxService::new(mock_tx_verifier.clone()), 10);

// Push the genesis block to the state.
// This must be done before creating the mempool to avoid `chain_tip_change`
// returning "reset" which would clear the mempool.
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.ready_and()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();

let mut mempool_service = Mempool::new(
network,
buffered_peer_set.clone(),
Expand All @@ -210,6 +227,9 @@ async fn setup(
chain_tip_change,
);

// Enable the mempool
let _ = mempool_service.enable(&mut recent_syncs).await;

let mut added_transactions = None;
if add_transactions {
added_transactions = Some(add_some_stuff_to_mempool(&mut mempool_service, network));
Expand All @@ -233,17 +253,6 @@ async fn setup(
// We can't expect or unwrap because the returned Result does not implement Debug
assert!(r.is_ok());

// Push the genesis block to the state
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.oneshot(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();

(
inbound_service,
added_transactions,
Expand Down
Loading

0 comments on commit c6878d9

Please sign in to comment.