diff --git a/zebra-chain/src/block/hash.rs b/zebra-chain/src/block/hash.rs index ccf3217ea6f..045265ad69c 100644 --- a/zebra-chain/src/block/hash.rs +++ b/zebra-chain/src/block/hash.rs @@ -29,11 +29,22 @@ impl Hash { /// /// Zebra displays transaction and block hashes in big-endian byte-order, /// following the u256 convention set by Bitcoin and zcashd. - fn bytes_in_display_order(&self) -> [u8; 32] { + pub fn bytes_in_display_order(&self) -> [u8; 32] { let mut reversed_bytes = self.0; reversed_bytes.reverse(); reversed_bytes } + + /// Convert bytes in big-endian byte-order into a [`block::Hash`](crate::block::Hash). + /// + /// Zebra displays transaction and block hashes in big-endian byte-order, + /// following the u256 convention set by Bitcoin and zcashd. + pub fn from_bytes_in_display_order(bytes_in_display_order: &[u8; 32]) -> Hash { + let mut internal_byte_order = *bytes_in_display_order; + internal_byte_order.reverse(); + + Hash(internal_byte_order) + } } impl fmt::Display for Hash { diff --git a/zebra-chain/src/transaction/hash.rs b/zebra-chain/src/transaction/hash.rs index 68a0fb78694..2525c1d955c 100644 --- a/zebra-chain/src/transaction/hash.rs +++ b/zebra-chain/src/transaction/hash.rs @@ -109,11 +109,22 @@ impl Hash { /// /// Zebra displays transaction and block hashes in big-endian byte-order, /// following the u256 convention set by Bitcoin and zcashd. - fn bytes_in_display_order(&self) -> [u8; 32] { + pub fn bytes_in_display_order(&self) -> [u8; 32] { let mut reversed_bytes = self.0; reversed_bytes.reverse(); reversed_bytes } + + /// Convert bytes in big-endian byte-order into a [`transaction::Hash`](crate::transaction::Hash). + /// + /// Zebra displays transaction and block hashes in big-endian byte-order, + /// following the u256 convention set by Bitcoin and zcashd. + pub fn from_bytes_in_display_order(bytes_in_display_order: &[u8; 32]) -> Hash { + let mut internal_byte_order = *bytes_in_display_order; + internal_byte_order.reverse(); + + Hash(internal_byte_order) + } } impl ToHex for &Hash { diff --git a/zebra-consensus/src/transaction.rs b/zebra-consensus/src/transaction.rs index ef95ae28fa6..f992bdda3d8 100644 --- a/zebra-consensus/src/transaction.rs +++ b/zebra-consensus/src/transaction.rs @@ -305,7 +305,7 @@ where let span = tracing::debug_span!("tx", ?tx_id); async move { - tracing::trace!(?req); + tracing::trace!(?tx_id, ?req, "got tx verify request"); // Do basic checks first if let Some(block_time) = req.block_time() { @@ -344,6 +344,8 @@ where check::spend_conflicts(&tx)?; + tracing::trace!(?tx_id, "passed quick checks"); + // "The consensus rules applied to valueBalance, vShieldedOutput, and bindingSig // in non-coinbase transactions MUST also be applied to coinbase transactions." // @@ -360,6 +362,9 @@ where let cached_ffi_transaction = Arc::new(CachedFfiTransaction::new(tx.clone(), spent_outputs)); + + tracing::trace!(?tx_id, "got state UTXOs"); + let async_checks = match tx.as_ref() { Transaction::V1 { .. } | Transaction::V2 { .. } | Transaction::V3 { .. } => { tracing::debug!(?tx, "got transaction with wrong version"); @@ -391,10 +396,14 @@ where )?, }; + tracing::trace!(?tx_id, "awaiting async checks..."); + // If the Groth16 parameter download hangs, // Zebra will timeout here, waiting for the async checks. async_checks.check().await?; + tracing::trace!(?tx_id, "finished async checks"); + // Get the `value_balance` to calculate the transaction fee. let value_balance = tx.value_balance(&spent_utxos); @@ -429,6 +438,10 @@ where Ok(rsp) } + .inspect(move |result| { + // Hide the transaction data to avoid filling the logs + tracing::trace!(?tx_id, result = ?result.as_ref().map(|_tx| ()), "got tx verify result"); + }) .instrument(span) .boxed() } diff --git a/zebra-node-services/src/mempool/gossip.rs b/zebra-node-services/src/mempool/gossip.rs index 2e344893a57..51b48968e12 100644 --- a/zebra-node-services/src/mempool/gossip.rs +++ b/zebra-node-services/src/mempool/gossip.rs @@ -3,7 +3,7 @@ use zebra_chain::transaction::{UnminedTx, UnminedTxId}; /// A gossiped transaction, which can be the transaction itself or just its ID. -#[derive(Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum Gossip { /// Just the ID of an unmined transaction. Id(UnminedTxId), @@ -20,6 +20,14 @@ impl Gossip { Gossip::Tx(tx) => tx.id, } } + + /// Return the [`UnminedTx`] of a gossiped transaction, if we have it. + pub fn tx(&self) -> Option { + match self { + Gossip::Id(_) => None, + Gossip::Tx(tx) => Some(tx.clone()), + } + } } impl From for Gossip { diff --git a/zebra-rpc/src/config.rs b/zebra-rpc/src/config.rs index d8cb4127492..c08b7da9a2a 100644 --- a/zebra-rpc/src/config.rs +++ b/zebra-rpc/src/config.rs @@ -49,6 +49,10 @@ pub struct Config { /// /// If some of those instances are outdated or failed, RPC queries can be slow or inconsistent. pub parallel_cpu_threads: usize, + + /// Test-only option that makes Zebra say it is at the chain tip, + /// no matter what the estimated height or local clock is. + pub debug_force_finished_sync: bool, } impl Default for Config { @@ -59,6 +63,9 @@ impl Default for Config { // Use a single thread, so we can detect RPC port conflicts. parallel_cpu_threads: 1, + + // Debug options are always off by default. + debug_force_finished_sync: false, } } } diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index e9ce2c889ae..dcd8c91a835 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -243,9 +243,20 @@ where >, Tip: ChainTip, { + // Configuration + // /// Zebra's application version. app_version: String, + /// The configured network for this RPC service. + network: Network, + + /// Test-only option that makes Zebra say it is at the chain tip, + /// no matter what the estimated height or local clock is. + debug_force_finished_sync: bool, + + // Services + // /// A handle to the mempool service. mempool: Buffer, @@ -255,10 +266,8 @@ where /// Allows efficient access to the best tip of the blockchain. latest_chain_tip: Tip, - /// The configured network for this RPC service. - #[allow(dead_code)] - network: Network, - + // Tasks + // /// A sender component of a channel used to send transactions to the queue. queue_sender: Sender>, } @@ -279,10 +288,11 @@ where /// Create a new instance of the RPC handler. pub fn new( app_version: Version, + network: Network, + debug_force_finished_sync: bool, mempool: Buffer, state: State, latest_chain_tip: Tip, - network: Network, ) -> (Self, JoinHandle<()>) where Version: ToString, @@ -300,10 +310,11 @@ where let rpc_impl = RpcImpl { app_version, + network, + debug_force_finished_sync, mempool: mempool.clone(), state: state.clone(), latest_chain_tip: latest_chain_tip.clone(), - network, queue_sender: runner.sender(), }; @@ -379,13 +390,18 @@ where data: None, })?; - let estimated_height = + let mut estimated_height = if current_block_time > Utc::now() || zebra_estimated_height < tip_height { tip_height } else { zebra_estimated_height }; + // If we're testing the mempool, force the estimated height to be the actual tip height. + if self.debug_force_finished_sync { + estimated_height = tip_height; + } + // `upgrades` object // // Get the network upgrades in height order, like `zcashd`. @@ -506,6 +522,8 @@ where "mempool service returned more results than expected" ); + tracing::debug!("sent transaction to mempool: {:?}", &queue_results[0]); + match &queue_results[0] { Ok(()) => Ok(SentTransactionHash(transaction_hash)), Err(error) => Err(Error { diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index f4152658bed..01692c343d0 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -41,10 +41,11 @@ proptest! { let mut state: MockService<_, _, _, BoxError> = MockService::build().for_prop_tests(); let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); let hash = SentTransactionHash(transaction.hash()); @@ -93,10 +94,11 @@ proptest! { let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); let transaction_bytes = transaction @@ -150,10 +152,11 @@ proptest! { let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); let transaction_bytes = transaction @@ -215,10 +218,11 @@ proptest! { let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); let send_task = tokio::spawn(rpc.send_raw_transaction(non_hex_string)); @@ -269,10 +273,11 @@ proptest! { let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); let send_task = tokio::spawn(rpc.send_raw_transaction(hex::encode(random_bytes))); @@ -321,10 +326,11 @@ proptest! { let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); let call_task = tokio::spawn(rpc.get_raw_mempool()); @@ -376,10 +382,11 @@ proptest! { let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); let send_task = tokio::spawn(rpc.get_raw_transaction(non_hex_string, 0)); @@ -432,10 +439,11 @@ proptest! { let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); let send_task = tokio::spawn(rpc.get_raw_transaction(hex::encode(random_bytes), 0)); @@ -477,10 +485,11 @@ proptest! { // look for an error with a `NoChainTip` let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + network, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - network, ); let response = rpc.get_blockchain_info(); @@ -525,10 +534,11 @@ proptest! { // Start RPC with the mocked `ChainTip` let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + network, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), chain_tip, - network, ); let response = rpc.get_blockchain_info(); @@ -609,10 +619,11 @@ proptest! { runtime.block_on(async move { let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + network, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), chain_tip, - network, ); // Build the future to call the RPC @@ -670,10 +681,11 @@ proptest! { runtime.block_on(async move { let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + network, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), chain_tip, - network, ); let address_strings = AddressStrings { @@ -719,10 +731,11 @@ proptest! { let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); // send a transaction @@ -806,10 +819,11 @@ proptest! { let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); let mut transactions_hash_set = HashSet::new(); diff --git a/zebra-rpc/src/methods/tests/snapshot.rs b/zebra-rpc/src/methods/tests/snapshot.rs index 48e9d71a942..d9e1a8f4277 100644 --- a/zebra-rpc/src/methods/tests/snapshot.rs +++ b/zebra-rpc/src/methods/tests/snapshot.rs @@ -44,10 +44,11 @@ async fn test_rpc_response_data_for_network(network: Network) { // Init RPC let (rpc, _rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + network, + false, Buffer::new(mempool.clone(), 1), read_state, latest_chain_tip, - network, ); // Start snapshots of RPC responses. diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index 2e276aa6d6e..114602ca7d9 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -29,10 +29,11 @@ async fn rpc_getinfo() { let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); let get_info = rpc.get_info().expect("We should have a GetInfo struct"); @@ -71,10 +72,11 @@ async fn rpc_getblock() { // Init RPC let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), read_state, latest_chain_tip, - Mainnet, ); // Make calls with verbosity=0 and check response @@ -123,10 +125,11 @@ async fn rpc_getblock_parse_error() { // Init RPC let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); // Make sure we get an error if Zebra can't parse the block height. @@ -153,10 +156,11 @@ async fn rpc_getblock_missing_error() { // Init RPC let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); // Make sure Zebra returns the correct error code `-8` for missing blocks @@ -219,10 +223,11 @@ async fn rpc_getbestblockhash() { // Init RPC let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), read_state, latest_chain_tip, - Mainnet, ); // Get the tip hash using RPC method `get_best_block_hash` @@ -259,10 +264,11 @@ async fn rpc_getrawtransaction() { // Init RPC let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), read_state, latest_chain_tip, - Mainnet, ); // Test case where transaction is in mempool. @@ -344,10 +350,11 @@ async fn rpc_getaddresstxids_invalid_arguments() { let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(read_state.clone(), 1), latest_chain_tip, - Mainnet, ); // call the method with an invalid address string @@ -485,10 +492,11 @@ async fn rpc_getaddresstxids_response_with( let (rpc, rpc_tx_queue_task_handle) = RpcImpl::new( "RPC test", + network, + false, Buffer::new(mempool.clone(), 1), Buffer::new(read_state.clone(), 1), latest_chain_tip, - network, ); // call the method with valid arguments @@ -534,10 +542,11 @@ async fn rpc_getaddressutxos_invalid_arguments() { let rpc = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(state.clone(), 1), NoChainTip, - Mainnet, ); // call the method with an invalid address string @@ -583,10 +592,11 @@ async fn rpc_getaddressutxos_response() { let rpc = RpcImpl::new( "RPC test", + Mainnet, + false, Buffer::new(mempool.clone(), 1), Buffer::new(read_state.clone(), 1), latest_chain_tip, - Mainnet, ); // call the method with a valid address diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index b8cd316a352..b49460502a9 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -65,8 +65,14 @@ impl RpcServer { info!("Trying to open RPC endpoint at {}...", listen_addr,); // Initialize the rpc methods with the zebra version - let (rpc_impl, rpc_tx_queue_task_handle) = - RpcImpl::new(app_version, mempool, state, latest_chain_tip, network); + let (rpc_impl, rpc_tx_queue_task_handle) = RpcImpl::new( + app_version, + network, + config.debug_force_finished_sync, + mempool, + state, + latest_chain_tip, + ); // Create handler compatible with V1 and V2 RPC protocols let mut io: MetaIoHandler<(), _> = diff --git a/zebra-rpc/src/server/tests/vectors.rs b/zebra-rpc/src/server/tests/vectors.rs index 8b2e77ff042..23747361e0f 100644 --- a/zebra-rpc/src/server/tests/vectors.rs +++ b/zebra-rpc/src/server/tests/vectors.rs @@ -38,6 +38,7 @@ fn rpc_server_spawn(parallel_cpu_threads: bool) { let config = Config { listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), parallel_cpu_threads: if parallel_cpu_threads { 2 } else { 1 }, + debug_force_finished_sync: false, }; let rt = tokio::runtime::Runtime::new().unwrap(); @@ -102,6 +103,7 @@ fn rpc_server_spawn_unallocated_port(parallel_cpu_threads: bool) { let config = Config { listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), parallel_cpu_threads: if parallel_cpu_threads { 0 } else { 1 }, + debug_force_finished_sync: false, }; let rt = tokio::runtime::Runtime::new().unwrap(); @@ -155,6 +157,7 @@ fn rpc_server_spawn_port_conflict() { let config = Config { listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), parallel_cpu_threads: 1, + debug_force_finished_sync: false, }; let rt = tokio::runtime::Runtime::new().unwrap(); @@ -250,6 +253,7 @@ fn rpc_server_spawn_port_conflict_parallel_auto() { let config = Config { listen_addr: Some(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into()), parallel_cpu_threads: 2, + debug_force_finished_sync: false, }; let rt = tokio::runtime::Runtime::new().unwrap(); diff --git a/zebrad/src/components/mempool.rs b/zebrad/src/components/mempool.rs index 844624bb92e..6202aadd532 100644 --- a/zebrad/src/components/mempool.rs +++ b/zebrad/src/components/mempool.rs @@ -291,6 +291,8 @@ impl Service for Mempool { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let is_state_changed = self.update_state(); + tracing::trace!(is_enabled = ?self.is_enabled(), ?is_state_changed, "started polling the mempool..."); + // When the mempool is disabled we still return that the service is ready. // Otherwise, callers could block waiting for the mempool to be enabled. if !self.is_enabled() { @@ -338,21 +340,31 @@ impl Service for Mempool { while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) { match r { Ok(tx) => { - if let Ok(inserted_id) = storage.insert(tx.clone()) { + let insert_result = storage.insert(tx.clone()); + + tracing::trace!( + ?insert_result, + "got Ok(_) transaction verify, tried to store", + ); + + if let Ok(inserted_id) = insert_result { // Save transaction ids that we will send to peers send_to_peers_ids.insert(inserted_id); } } - Err((txid, e)) => { - metrics::counter!("mempool.failed.verify.tasks.total", 1, "reason" => e.to_string()); - storage.reject_if_needed(txid, e); - // TODO: should we also log the result? + Err((txid, error)) => { + tracing::debug!(?txid, ?error, "mempool transaction failed to verify"); + + metrics::counter!("mempool.failed.verify.tasks.total", 1, "reason" => error.to_string()); + storage.reject_if_needed(txid, error); } }; } // Handle best chain tip changes if let Some(TipAction::Grow { block }) = tip_action { + tracing::trace!(block_height = ?block.height, "handling blocks added to tip"); + // Cancel downloads/verifications/storage of transactions // with the same mined IDs as recently mined transactions. let mined_ids = block.transaction_hashes.iter().cloned().collect(); @@ -367,10 +379,19 @@ impl Service for Mempool { // Remove transactions that are expired from the peers list send_to_peers_ids = Self::remove_expired_from_peer_list(&send_to_peers_ids, &expired_transactions); + + if !expired_transactions.is_empty() { + tracing::debug!( + ?expired_transactions, + "removed expired transactions from the mempool", + ); + } } // Send transactions that were not rejected nor expired to peers if !send_to_peers_ids.is_empty() { + tracing::trace!(?send_to_peers_ids, "sending new transactions to peers"); + self.transaction_sender.send(send_to_peers_ids)?; } } @@ -391,29 +412,59 @@ impl Service for Mempool { } => match req { // Queries Request::TransactionIds => { - let res = storage.tx_ids().collect(); + trace!(?req, "got mempool request"); + + let res: HashSet<_> = storage.tx_ids().collect(); + + // This log line is checked by tests, + // because lightwalletd doesn't return mempool transactions at the moment. + // + // TODO: downgrade to trace level when we can check transactions via gRPC + info!(?req, res_count = ?res.len(), "answered mempool request"); + async move { Ok(Response::TransactionIds(res)) }.boxed() } - Request::TransactionsById(ids) => { - let res = storage.transactions_exact(ids).cloned().collect(); + Request::TransactionsById(ref ids) => { + trace!(?req, "got mempool request"); + + let res: Vec<_> = storage.transactions_exact(ids.clone()).cloned().collect(); + + trace!(?req, res_count = ?res.len(), "answered mempool request"); + async move { Ok(Response::Transactions(res)) }.boxed() } - Request::TransactionsByMinedId(ids) => { - let res = storage.transactions_same_effects(ids).cloned().collect(); + Request::TransactionsByMinedId(ref ids) => { + trace!(?req, "got mempool request"); + + let res: Vec<_> = storage + .transactions_same_effects(ids.clone()) + .cloned() + .collect(); + + trace!(?req, res_count = ?res.len(), "answered mempool request"); + async move { Ok(Response::Transactions(res)) }.boxed() } - Request::RejectedTransactionIds(ids) => { - let res = storage.rejected_transactions(ids).collect(); + Request::RejectedTransactionIds(ref ids) => { + trace!(?req, "got mempool request"); + + let res = storage.rejected_transactions(ids.clone()).collect(); + + trace!(?req, ?res, "answered mempool request"); + async move { Ok(Response::RejectedTransactionIds(res)) }.boxed() } // Queue mempool candidates Request::Queue(gossiped_txs) => { + trace!(req_count = ?gossiped_txs.len(), "got mempool Queue request"); + let rsp: Vec> = gossiped_txs .into_iter() .map(|gossiped_tx| -> Result<(), MempoolError> { storage.should_download_or_verify(gossiped_tx.id())?; tx_downloads.download_if_needed_and_verify(gossiped_tx)?; + Ok(()) }) .map(|result| result.map_err(BoxError::from)) @@ -423,11 +474,17 @@ impl Service for Mempool { // Store successfully downloaded and verified transactions in the mempool Request::CheckForVerifiedTransactions => { + trace!(?req, "got mempool request"); + // all the work for this request is done in poll_ready async move { Ok(Response::CheckedForVerifiedTransactions) }.boxed() } }, ActiveState::Disabled => { + // TODO: add the name of the request, but not the content, + // like the command() or Display impls of network requests + trace!("got mempool request while mempool is disabled"); + // We can't return an error since that will cause a disconnection // by the peer connection handler. Therefore, return successful // empty responses. diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 472d9b477d5..6cffca9f73b 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -36,6 +36,7 @@ use futures::{ future::TryFutureExt, ready, stream::{FuturesUnordered, Stream}, + FutureExt, }; use pin_project::{pin_project, pinned_drop}; use thiserror::Error; @@ -273,6 +274,8 @@ where // Don't download/verify if the transaction is already in the state. Self::transaction_in_state(&mut state, txid).await?; + trace!(?txid, "transaction is not in state"); + let next_height = match state.oneshot(zs::Request::Tip).await { Ok(zs::Response::Tip(None)) => Ok(Height(0)), Ok(zs::Response::Tip(Some((height, _hash)))) => { @@ -284,6 +287,8 @@ where Err(e) => Err(TransactionDownloadVerifyError::StateError(e)), }?; + trace!(?txid, ?next_height, "got next height"); + let tx = match gossiped_tx { Gossip::Id(txid) => { let req = zn::Request::TransactionsById(std::iter::once(txid).collect()); @@ -322,6 +327,8 @@ where } }; + trace!(?txid, "got tx"); + let result = verifier .oneshot(tx::Request::Mempool { transaction: tx.clone(), @@ -333,7 +340,8 @@ where }) .await; - debug!(?txid, ?result, "verified transaction for the mempool"); + // Hide the transaction data to avoid filling the logs + trace!(?txid, result = ?result.as_ref().map(|_tx| ()), "verified transaction for the mempool"); result.map_err(|e| TransactionDownloadVerifyError::Invalid(e.into())) } @@ -348,6 +356,11 @@ where // Tack the hash onto the error so we can remove the cancel handle // on failure as well as on success. .map_err(move |e| (e, txid)) + .inspect(move |result| { + // Hide the transaction data to avoid filling the logs + let result = result.as_ref().map(|_tx| txid); + debug!("mempool transaction result: {result:?}"); + }) .in_current_span(); let task = tokio::spawn(async move { diff --git a/zebrad/src/components/mempool/storage.rs b/zebrad/src/components/mempool/storage.rs index 440bba0a7de..d4acc47f33a 100644 --- a/zebrad/src/components/mempool/storage.rs +++ b/zebrad/src/components/mempool/storage.rs @@ -179,6 +179,13 @@ impl Storage { // First, check if we have a cached rejection for this transaction. if let Some(error) = self.rejection_error(&tx_id) { + tracing::trace!( + ?tx_id, + ?error, + stored_transaction_count = ?self.verified.transaction_count(), + "returning cached error for transaction", + ); + return Err(error); } @@ -187,12 +194,25 @@ impl Storage { // Security: transactions must not get refreshed by new queries, // because that allows malicious peers to keep transactions live forever. if self.verified.contains(&tx_id) { + tracing::trace!( + ?tx_id, + stored_transaction_count = ?self.verified.transaction_count(), + "returning InMempool error for transaction that is already in the mempool", + ); + return Err(MempoolError::InMempool); } // Then, we try to insert into the pool. If this fails the transaction is rejected. let mut result = Ok(tx_id); if let Err(rejection_error) = self.verified.insert(tx) { + tracing::debug!( + ?tx_id, + ?rejection_error, + stored_transaction_count = ?self.verified.transaction_count(), + "insertion error for transaction", + ); + // We could return here, but we still want to check the mempool size self.reject(tx_id, rejection_error.clone().into()); result = Err(rejection_error.into()); diff --git a/zebrad/src/components/mempool/storage/verified_set.rs b/zebrad/src/components/mempool/storage/verified_set.rs index 4f48445498a..3735e4b6a48 100644 --- a/zebrad/src/components/mempool/storage/verified_set.rs +++ b/zebrad/src/components/mempool/storage/verified_set.rs @@ -72,7 +72,7 @@ impl VerifiedSet { } /// Returns `true` if the set of verified transactions contains the transaction with the - /// specified `id. + /// specified [`UnminedTxId`]. pub fn contains(&self, id: &UnminedTxId) -> bool { self.transactions.iter().any(|tx| &tx.transaction.id == id) } diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 47058a2e254..50584b849b8 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -102,6 +102,16 @@ //! ``` //! //! Please refer to the documentation of each test for more information. +//! +//! ## Disk Space for Testing +//! +//! The full sync and lightwalletd tests with cached state expect a temporary directory with +//! at least 300 GB of disk space (2 copies of the full chain). To use another disk for the +//! temporary test files: +//! +//! ```sh +//! export TMPDIR=/path/to/disk/directory +//! ``` use std::{collections::HashSet, env, fs, panic, path::PathBuf, time::Duration}; @@ -1819,8 +1829,12 @@ async fn fully_synced_rpc_test() -> Result<()> { let network = Network::Mainnet; - let (_zebrad, zebra_rpc_address) = - spawn_zebrad_for_rpc_without_initial_peers(network, cached_state_path.unwrap(), test_type)?; + let (_zebrad, zebra_rpc_address) = spawn_zebrad_for_rpc_without_initial_peers( + network, + cached_state_path.unwrap(), + test_type, + true, + )?; // Make a getblock test that works only on synced node (high block number). // The block is before the mandatory checkpoint, so the checkpoint cached state can be used diff --git a/zebrad/tests/common/cached_state.rs b/zebrad/tests/common/cached_state.rs index 3323c3ac716..a2e97dbd32d 100644 --- a/zebrad/tests/common/cached_state.rs +++ b/zebrad/tests/common/cached_state.rs @@ -63,21 +63,37 @@ pub async fn load_tip_height_from_state_directory( Ok(chain_tip_height) } -/// Recursively copy a chain state directory into a new temporary directory. -pub async fn copy_state_directory(source: impl AsRef) -> Result { +/// Recursively copy a chain state database directory into a new temporary directory. +pub async fn copy_state_directory(network: Network, source: impl AsRef) -> Result { + // Copy the database files for this state and network, excluding testnet and other state versions let source = source.as_ref(); + let state_config = zebra_state::Config { + cache_dir: source.into(), + ..Default::default() + }; + let source_net_dir = state_config.db_path(network); + let source_net_dir = source_net_dir.as_path(); + let state_suffix = source_net_dir + .strip_prefix(source) + .expect("db_path() is a subdirectory"); + let destination = testdir()?; + let destination_net_dir = destination.path().join(state_suffix); tracing::info!( ?source, + ?source_net_dir, + ?state_suffix, ?destination, + ?destination_net_dir, "copying cached state files (this may take some time)...", ); - let mut remaining_directories = vec![PathBuf::from(source)]; + let mut remaining_directories = vec![PathBuf::from(source_net_dir)]; while let Some(directory) = remaining_directories.pop() { - let sub_directories = copy_directory(&directory, source, destination.as_ref()).await?; + let sub_directories = + copy_directory(&directory, source_net_dir, destination_net_dir.as_ref()).await?; remaining_directories.extend(sub_directories); } diff --git a/zebrad/tests/common/launch.rs b/zebrad/tests/common/launch.rs index 6231200e199..dcbe2e4df19 100644 --- a/zebrad/tests/common/launch.rs +++ b/zebrad/tests/common/launch.rs @@ -23,7 +23,10 @@ use zebra_test::{ }; use zebrad::config::ZebradConfig; -use crate::common::lightwalletd::{random_known_rpc_port_config, LightwalletdTestType}; +use crate::common::{ + lightwalletd::{random_known_rpc_port_config, LightwalletdTestType}, + sync::FINISH_PARTIAL_SYNC_TIMEOUT, +}; /// After we launch `zebrad`, wait this long for the command to start up, /// take the actions expected by the tests, and log the expected logs. @@ -47,27 +50,14 @@ pub const BETWEEN_NODES_DELAY: Duration = Duration::from_secs(5); /// The amount of time we wait for lightwalletd to update to the tip. /// -/// `lightwalletd` takes about 90 minutes to fully sync, -/// and `zebrad` takes about 30 minutes to update to the tip. -/// -/// TODO: reduce to 20 minutes when `zebrad` sync performance improves -pub const LIGHTWALLETD_UPDATE_TIP_DELAY: Duration = Duration::from_secs(11 * 60 * 60); +/// `lightwalletd` takes about 60-120 minutes to fully sync, +/// and `zebrad` can take hours to update to the tip under load. +pub const LIGHTWALLETD_UPDATE_TIP_DELAY: Duration = FINISH_PARTIAL_SYNC_TIMEOUT; /// The amount of time we wait for lightwalletd to do a full sync to the tip. /// /// See [`LIGHTWALLETD_UPDATE_TIP_DELAY`] for details. -pub const LIGHTWALLETD_FULL_SYNC_TIP_DELAY: Duration = Duration::from_secs(11 * 60 * 60); - -/// The amount of extra time we wait for Zebra to sync to the tip, -/// after we ignore a lightwalletd failure. -/// -/// Since we restart `lightwalletd` after a hang, we allow time for another full `lightwalletd` sync. -/// -/// See [`LIGHTWALLETD_UPDATE_TIP_DELAY`] for details. -/// -/// TODO: remove this extra time when lightwalletd hangs are fixed -pub const ZEBRAD_EXTRA_DELAY_FOR_LIGHTWALLETD_WORKAROUND: Duration = - LIGHTWALLETD_FULL_SYNC_TIP_DELAY; +pub const LIGHTWALLETD_FULL_SYNC_TIP_DELAY: Duration = FINISH_PARTIAL_SYNC_TIMEOUT; /// Extension trait for methods on `tempfile::TempDir` for using it as a test /// directory for `zebrad`. @@ -214,6 +204,7 @@ pub fn spawn_zebrad_for_rpc_without_initial_peers Result<(TestChild

, SocketAddr)> { // This is what we recommend our users configure. let mut config = random_known_rpc_port_config(true) @@ -224,6 +215,7 @@ pub fn spawn_zebrad_for_rpc_without_initial_peers Duration { - let base_timeout = match self { + match self { LaunchWithEmptyState => LIGHTWALLETD_DELAY, FullSyncFromGenesis { .. } => LIGHTWALLETD_FULL_SYNC_TIP_DELAY, UpdateCachedState | UpdateZebraCachedStateNoRpc => LIGHTWALLETD_UPDATE_TIP_DELAY, - }; - - // If lightwalletd hangs and times out, Zebra needs a bit of extra time to finish - base_timeout + ZEBRAD_EXTRA_DELAY_FOR_LIGHTWALLETD_WORKAROUND + } } /// Returns the `lightwalletd` timeout for this test type. @@ -396,7 +393,7 @@ impl LightwalletdTestType { } // We use the same timeouts for zebrad and lightwalletd, - // because the tests swap between checking zebrad and lightwalletd. + // because the tests check zebrad and lightwalletd concurrently. match self { LaunchWithEmptyState => LIGHTWALLETD_DELAY, FullSyncFromGenesis { .. } => LIGHTWALLETD_FULL_SYNC_TIP_DELAY, diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index db638204c67..35f20db4c77 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -24,8 +24,11 @@ use futures::TryFutureExt; use tower::{Service, ServiceExt}; use zebra_chain::{ - block, chain_tip::ChainTip, parameters::Network, serialization::ZcashSerialize, - transaction::Transaction, + block, + chain_tip::ChainTip, + parameters::Network, + serialization::ZcashSerialize, + transaction::{self, Transaction}, }; use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY; use zebra_state::HashOrHeight; @@ -35,14 +38,30 @@ use crate::common::{ cached_state::{load_tip_height_from_state_directory, start_state_service_with_cache_dir}, launch::spawn_zebrad_for_rpc_without_initial_peers, lightwalletd::{ - wallet_grpc::{self, connect_to_lightwalletd, spawn_lightwalletd_with_rpc_server}, + wallet_grpc::{ + self, connect_to_lightwalletd, spawn_lightwalletd_with_rpc_server, Empty, Exclude, + }, zebra_skip_lightwalletd_tests, LightwalletdTestType::*, }, - sync::perform_full_sync_starting_from, + sync::copy_state_and_perform_full_sync, }; +/// The maximum number of transactions we want to send in the test. +/// This avoids filling the mempool queue and generating errors. +/// +/// TODO: replace with a const when `min()` stabilises as a const function: +/// https://github.com/rust-lang/rust/issues/92391 +fn max_sent_transactions() -> usize { + min(CHANNEL_AND_QUEUE_CAPACITY, MAX_INBOUND_CONCURRENCY) - 1 +} + /// The test entry point. +// +// TODO: +// - check output of zebrad and lightwalletd in different threads, +// to avoid test hangs due to full output pipes +// (see lightwalletd_integration_test for an example) pub async fn run() -> Result<()> { let _init_guard = zebra_test::init(); @@ -82,7 +101,7 @@ pub async fn run() -> Result<()> { ); let mut transactions = - load_transactions_from_a_future_block(network, zebrad_state_path.clone()).await?; + load_transactions_from_future_blocks(network, zebrad_state_path.clone()).await?; tracing::info!( transaction_count = ?transactions.len(), @@ -90,12 +109,17 @@ pub async fn run() -> Result<()> { "got transactions to send", ); - let (_zebrad, zebra_rpc_address) = - spawn_zebrad_for_rpc_without_initial_peers(Network::Mainnet, zebrad_state_path, test_type)?; + // TODO: change debug_skip_parameter_preload to true if we do the mempool test in the wallet gRPC test + let (mut zebrad, zebra_rpc_address) = spawn_zebrad_for_rpc_without_initial_peers( + Network::Mainnet, + zebrad_state_path, + test_type, + false, + )?; tracing::info!( ?zebra_rpc_address, - "spawned disconnected zebrad with shorter chain", + "spawned disconnected zebrad with shorter chain, waiting for mempool activation...", ); let (_lightwalletd, lightwalletd_rpc_port) = spawn_lightwalletd_with_rpc_server( @@ -107,25 +131,44 @@ pub async fn run() -> Result<()> { tracing::info!( ?lightwalletd_rpc_port, - "spawned lightwalletd connected to zebrad", + "spawned lightwalletd connected to zebrad, waiting for zebrad mempool activation...", + ); + + zebrad.expect_stdout_line_matches("activating mempool")?; + + // TODO: check that lightwalletd is at the tip using gRPC (#4894) + // + // If this takes a long time, we might need to check zebrad logs for failures in a separate thread. + + tracing::info!( + ?lightwalletd_rpc_port, + "connecting gRPC client to lightwalletd...", ); let mut rpc_client = connect_to_lightwalletd(lightwalletd_rpc_port).await?; // To avoid filling the mempool queue, limit the transactions to be sent to the RPC and mempool queue limits - transactions.truncate(min(CHANNEL_AND_QUEUE_CAPACITY, MAX_INBOUND_CONCURRENCY) - 1); + transactions.truncate(max_sent_transactions()); + + let transaction_hashes: Vec = + transactions.iter().map(|tx| tx.hash()).collect(); tracing::info!( transaction_count = ?transactions.len(), + ?transaction_hashes, "connected gRPC client to lightwalletd, sending transactions...", ); for transaction in transactions { + let transaction_hash = transaction.hash(); + let expected_response = wallet_grpc::SendResponse { error_code: 0, - error_message: format!("\"{}\"", transaction.hash()), + error_message: format!("\"{}\"", transaction_hash), }; + tracing::info!(?transaction_hash, "sending transaction..."); + let request = prepare_send_transaction_request(transaction); let response = rpc_client.send_transaction(request).await?.into_inner(); @@ -133,6 +176,64 @@ pub async fn run() -> Result<()> { assert_eq!(response, expected_response); } + tracing::info!("waiting for mempool to verify some transactions..."); + zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; + + tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); + let mut transactions_stream = rpc_client + .get_mempool_tx(Exclude { txid: vec![] }) + .await? + .into_inner(); + + // We'd like to check that lightwalletd queries the mempool, but it looks like it doesn't do it after each GetMempoolTx request. + //zebrad.expect_stdout_line_matches("answered mempool request req=TransactionIds")?; + + // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. + let mut counter = 0; + while let Some(tx) = transactions_stream.message().await? { + let hash: [u8; 32] = tx.hash.clone().try_into().expect("hash is correct length"); + let hash = transaction::Hash::from_bytes_in_display_order(&hash); + + assert!( + transaction_hashes.contains(&hash), + "unexpected transaction {hash:?}\n\ + in isolated mempool: {tx:?}", + ); + + counter += 1; + } + + // This RPC has temporarily been disabled in `lightwalletd`: + // https://github.com/adityapk00/lightwalletd/blob/b563f765f620e38f482954cd8ff3cc6d17cf2fa7/frontend/service.go#L529-L531 + // + // TODO: re-enable it when lightwalletd starts returning transactions again. + //assert!(counter >= 1, "all transactions from future blocks failed to send to an isolated mempool"); + assert_eq!( + counter, 0, + "developers: update this test for lightwalletd sending transactions" + ); + + // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. + tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); + let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); + + let mut counter = 0; + while let Some(_tx) = transaction_stream.message().await? { + // TODO: check tx.data or tx.height here? + + counter += 1; + } + + // This RPC has temporarily been disabled in `lightwalletd`: + // https://github.com/adityapk00/lightwalletd/blob/b563f765f620e38f482954cd8ff3cc6d17cf2fa7/frontend/service.go#L515-L517 + // + // TODO: re-enable it when lightwalletd starts streaming transactions again. + //assert!(counter >= 1, "all transactions from future blocks failed to send to an isolated mempool"); + assert_eq!( + counter, 0, + "developers: update this test for lightwalletd sending transactions" + ); + Ok(()) } @@ -146,7 +247,7 @@ pub async fn run() -> Result<()> { /// Returns a list of valid transactions that are not in any of the blocks present in the /// original `zebrad_state_path`. #[tracing::instrument] -async fn load_transactions_from_a_future_block( +async fn load_transactions_from_future_blocks( network: Network, zebrad_state_path: PathBuf, ) -> Result>> { @@ -160,7 +261,7 @@ async fn load_transactions_from_a_future_block( ); let full_sync_path = - perform_full_sync_starting_from(network, zebrad_state_path.as_ref()).await?; + copy_state_and_perform_full_sync(network, zebrad_state_path.as_ref()).await?; tracing::info!(?full_sync_path, "loading transactions..."); @@ -195,21 +296,38 @@ async fn load_transactions_from_block_after( assert!( tip_height > height, - "Chain not synchronized to a block after the specified height" + "Chain not synchronized to a block after the specified height", ); let mut target_height = height.0; let mut transactions = Vec::new(); - while transactions.is_empty() { - transactions = + while transactions.len() < max_sent_transactions() { + let new_transactions = load_transactions_from_block(block::Height(target_height), &mut state).await?; - transactions.retain(|transaction| !transaction.is_coinbase()); + if let Some(mut new_transactions) = new_transactions { + new_transactions.retain(|transaction| !transaction.is_coinbase()); + transactions.append(&mut new_transactions); + } else { + tracing::info!( + "Reached the end of the finalized chain\n\ + collected {} transactions from {} blocks before {target_height:?}", + transactions.len(), + target_height - height.0 - 1, + ); + break; + } target_height += 1; } + tracing::info!( + "Collected {} transactions from {} blocks before {target_height:?}", + transactions.len(), + target_height - height.0 - 1, + ); + Ok(transactions) } @@ -219,7 +337,7 @@ async fn load_transactions_from_block_after( async fn load_transactions_from_block( height: block::Height, state: &mut ReadStateService, -) -> Result>> +) -> Result>>> where ReadStateService: Service< zebra_state::ReadRequest, @@ -238,12 +356,15 @@ where let block = match response { zebra_state::ReadResponse::Block(Some(block)) => block, zebra_state::ReadResponse::Block(None) => { - panic!("Missing block at {height:?} from state") + tracing::info!( + "Reached the end of the finalized chain, state is missing block at {height:?}", + ); + return Ok(None); } _ => unreachable!("Incorrect response from state service: {response:?}"), }; - Ok(block.transactions.to_vec()) + Ok(Some(block.transactions.to_vec())) } /// Prepare a request to send to lightwalletd that contains a transaction to be sent. diff --git a/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs b/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs index 6669c2b46bb..8b4bade02e4 100644 --- a/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs +++ b/zebrad/tests/common/lightwalletd/wallet_grpc_test.rs @@ -13,14 +13,16 @@ //! - `GetBlockRange`: Covered. //! //! - `GetTransaction`: Covered. -//! - `SendTransaction`: Not covered and it will never be, it has its own test. +//! - `SendTransaction`: Covered by the send_transaction_test. //! //! - `GetTaddressTxids`: Covered. //! - `GetTaddressBalance`: Covered. //! - `GetTaddressBalanceStream`: Covered. //! -//! - `GetMempoolTx`: Not covered. -//! - `GetMempoolStream`: Not covered. +//! - `GetMempoolTx`: Covered by the send_transaction_test, +//! currently disabled by `lightwalletd`. +//! - `GetMempoolStream`: Covered by the send_transaction_test, +//! currently disabled by `lightwalletd`. //! //! - `GetTreeState`: Covered. //! @@ -28,6 +30,7 @@ //! - `GetAddressUtxosStream`: Covered. //! //! - `GetLightdInfo`: Covered. +//! //! - `Ping`: Not covered and it will never be. `Ping` is only used for testing //! purposes. @@ -56,6 +59,11 @@ use crate::common::{ }; /// The test entry point. +// +// TODO: +// - check output of zebrad and lightwalletd in different threads, +// to avoid test hangs due to full output pipes +// (see lightwalletd_integration_test for an example) pub async fn run() -> Result<()> { let _init_guard = zebra_test::init(); @@ -93,12 +101,18 @@ pub async fn run() -> Result<()> { ); // Launch zebra using a predefined zebrad state path - let (_zebrad, zebra_rpc_address) = - spawn_zebrad_for_rpc_without_initial_peers(network, zebrad_state_path.unwrap(), test_type)?; + // + // TODO: change debug_skip_parameter_preload to true if we do the mempool test in the send transaction test + let (mut zebrad, zebra_rpc_address) = spawn_zebrad_for_rpc_without_initial_peers( + network, + zebrad_state_path.unwrap(), + test_type, + false, + )?; tracing::info!( ?zebra_rpc_address, - "launching lightwalletd connected to zebrad...", + "launching lightwalletd connected to zebrad, waiting for the mempool to activate...", ); // Launch lightwalletd @@ -109,8 +123,19 @@ pub async fn run() -> Result<()> { false, )?; - // Give lightwalletd a few seconds to open its grpc port before connecting to it - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + tracing::info!( + ?lightwalletd_rpc_port, + "spawned lightwalletd connected to zebrad, waiting for zebrad mempool activation...", + ); + + zebrad.expect_stdout_line_matches("activating mempool")?; + + // Give lightwalletd a few seconds to sync to the tip before connecting to it + // + // TODO: check that lightwalletd is at the tip using gRPC (#4894) + // + // If this takes a long time, we might need to check zebrad logs for failures in a separate thread. + tokio::time::sleep(std::time::Duration::from_secs(60)).await; tracing::info!( ?lightwalletd_rpc_port, @@ -277,8 +302,6 @@ pub async fn run() -> Result<()> { balance_zf.value_zat + balance_mg.value_zat ); - // TODO: Create call and checks for `GetMempoolTx` and `GetMempoolTxStream`? - let sapling_treestate_init_height = sapling_activation_height + 1; // Call `GetTreeState`. diff --git a/zebrad/tests/common/sync.rs b/zebrad/tests/common/sync.rs index 743a830eb92..0a130054cb4 100644 --- a/zebrad/tests/common/sync.rs +++ b/zebrad/tests/common/sync.rs @@ -72,8 +72,7 @@ pub const LARGE_CHECKPOINT_TIMEOUT: Duration = Duration::from_secs(180); /// The partially synchronized state is expected to be close to the tip, so this timeout can be /// lower than what's expected for a full synchronization. However, a value that's too short may /// cause the test to fail. -#[allow(dead_code)] -pub const FINISH_PARTIAL_SYNC_TIMEOUT: Duration = Duration::from_secs(60 * 60); +pub const FINISH_PARTIAL_SYNC_TIMEOUT: Duration = Duration::from_secs(11 * 60 * 60); /// The test sync height where we switch to using the default lookahead limit. /// @@ -300,11 +299,11 @@ pub fn sync_until( /// is returned afterwards, containing the fully synchronized chain state. #[allow(dead_code)] #[tracing::instrument] -pub async fn perform_full_sync_starting_from( +pub async fn copy_state_and_perform_full_sync( network: Network, partial_sync_path: &Path, ) -> Result { - let fully_synced_path = copy_state_directory(&partial_sync_path).await?; + let fully_synced_path = copy_state_directory(network, &partial_sync_path).await?; sync_until( Height::MAX,