diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index c121e3c9a4a..4f9e4bfb40e 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -872,7 +872,7 @@ where hashes .iter() .map(|(tx_loc, tx_id)| { - // TODO: downgrade to debug, because there's nothing the user can do + // Check that the returned transactions are in chain order. assert!( *tx_loc > last_tx_location, "Transactions were not in chain order:\n\ @@ -931,7 +931,7 @@ where let satoshis = u64::from(utxo_data.3.value); let output_location = *utxo_data.2; - // TODO: downgrade to debug, because there's nothing the user can do + // Check that the returned UTXOs are in chain order. assert!( output_location > last_output_location, "UTXOs were not in chain order:\n\ @@ -1272,17 +1272,19 @@ impl GetRawTransaction { /// Check if provided height range is valid for address indexes. fn check_height_range(start: Height, end: Height, chain_height: Height) -> Result<()> { if start == Height(0) || end == Height(0) { - return Err(Error::invalid_params( - "Start and end are expected to be greater than zero", - )); + return Err(Error::invalid_params(format!( + "start {start:?} and end {end:?} must both be greater than zero" + ))); } - if end < start { - return Err(Error::invalid_params( - "End value is expected to be greater than or equal to start", - )); + if start > end { + return Err(Error::invalid_params(format!( + "start {start:?} must be less than or equal to end {end:?}" + ))); } if start > chain_height || end > chain_height { - return Err(Error::invalid_params("Start or end is outside chain range")); + return Err(Error::invalid_params(format!( + "start {start:?} and end {end:?} must both be less than or equal to the chain tip {chain_height:?}" + ))); } Ok(()) diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index 114602ca7d9..ab09932d856 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -395,7 +395,7 @@ async fn rpc_getaddresstxids_invalid_arguments() { .unwrap_err(); assert_eq!( error.message, - "End value is expected to be greater than or equal to start".to_string() + "start Height(2) must be less than or equal to end Height(1)".to_string() ); // call the method with start equal zero @@ -411,7 +411,7 @@ async fn rpc_getaddresstxids_invalid_arguments() { .unwrap_err(); assert_eq!( error.message, - "Start and end are expected to be greater than zero".to_string() + "start Height(0) and end Height(1) must both be greater than zero".to_string() ); // call the method outside the chain tip height @@ -427,7 +427,7 @@ async fn rpc_getaddresstxids_invalid_arguments() { .unwrap_err(); assert_eq!( error.message, - "Start or end is outside chain range".to_string() + "start Height(1) and end Height(11) must both be less than or equal to the chain tip Height(10)".to_string() ); mempool.expect_no_requests().await; diff --git a/zebra-state/src/arbitrary.rs b/zebra-state/src/arbitrary.rs index 2d0c95d3c1a..653cd5aa0cf 100644 --- a/zebra-state/src/arbitrary.rs +++ b/zebra-state/src/arbitrary.rs @@ -17,6 +17,8 @@ use crate::{ /// Mocks computation done during semantic validation pub trait Prepare { + /// Runs block semantic validation computation, and returns the result. + /// Test-only method. fn prepare(self) -> PreparedBlock; } diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index 89505af7acf..8cfe7e132ae 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -16,7 +16,8 @@ extern crate tracing; #[cfg(any(test, feature = "proptest-impl"))] -mod arbitrary; +pub mod arbitrary; + mod config; pub mod constants; mod error; @@ -39,7 +40,7 @@ pub use service::{ #[cfg(any(test, feature = "proptest-impl"))] pub use service::{ - arbitrary::populated_state, + arbitrary::{populated_state, CHAIN_TIP_UPDATE_WAIT_LIMIT}, chain_tip::{ChainTipBlock, ChainTipSender}, init_test, init_test_services, }; diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index 7bf45ec02b2..044c6a9a9c0 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -381,20 +381,44 @@ pub enum Request { /// documentation for details. CommitBlock(PreparedBlock), - /// Commit a finalized block to the state, skipping all validation. + /// Commit a checkpointed block to the state, skipping most block validation. /// /// This is exposed for use in checkpointing, which produces finalized /// blocks. It is the caller's responsibility to ensure that the block is - /// valid and final. This request can be made out-of-order; the state service - /// will queue it until its parent is ready. + /// semantically valid and final. This request can be made out-of-order; + /// the state service will queue it until its parent is ready. /// /// Returns [`Response::Committed`] with the hash of the newly committed /// block, or an error. /// /// This request cannot be cancelled once submitted; dropping the response /// future will have no effect on whether it is eventually processed. - /// Duplicate requests should not be made, because it is the caller's - /// responsibility to ensure that each block is valid and final. + /// Duplicate requests will replace the older duplicate, and return an error + /// in its response future. + /// + /// # Note + /// + /// Finalized and non-finalized blocks are an internal Zebra implementation detail. + /// There is no difference between these blocks on the network, or in Zebra's + /// network or syncer implementations. + /// + /// # Consensus + /// + /// Checkpointing is allowed under the Zcash "social consensus" rules. + /// Zebra checkpoints both settled network upgrades, and blocks past the rollback limit. + /// (By the time Zebra release is tagged, its final checkpoint is typically hours or days old.) + /// + /// > A network upgrade is settled on a given network when there is a social consensus + /// > that it has activated with a given activation block hash. A full validator that + /// > potentially risks Mainnet funds or displays Mainnet transaction information to a user + /// > MUST do so only for a block chain that includes the activation block of the most + /// > recent settled network upgrade, with the corresponding activation block hash. + /// > ... + /// > A full validator MAY impose a limit on the number of blocks it will “roll back” + /// > when switching from one best valid block chain to another that is not a descendent. + /// > For `zcashd` and `zebra` this limit is 100 blocks. + /// + /// /// /// # Correctness /// diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index f69017728db..de881879171 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -19,6 +19,7 @@ use std::{ convert, future::Future, pin::Pin, + sync::{Arc, Mutex}, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -65,6 +66,7 @@ mod non_finalized_state; mod pending_utxos; mod queued_blocks; pub(crate) mod read; +mod write; #[cfg(any(test, feature = "proptest-impl"))] pub mod arbitrary; @@ -74,7 +76,7 @@ mod tests; pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation}; -use self::queued_blocks::QueuedFinalized; +use self::queued_blocks::{QueuedFinalized, QueuedNonFinalized}; /// A read-write service for Zebra's cached blockchain state. /// @@ -126,6 +128,43 @@ pub(crate) struct StateService { // and block write task share ownership of the database. pub(crate) disk: FinalizedState, + /// A channel to send blocks to the `block_write_task`, + /// so they can be written to the [`NonFinalizedState`]. + // + // TODO: actually send blocks on this channel + non_finalized_block_write_sender: + Option>, + + /// A channel to send blocks to the `block_write_task`, + /// so they can be written to the [`FinalizedState`]. + /// + /// This sender is dropped after the state has finished sending all the checkpointed blocks, + /// and the lowest non-finalized block arrives. + finalized_block_write_sender: Option>, + + /// The [`block::Hash`] of the most recent block sent on + /// `finalized_block_write_sender` or `non_finalized_block_write_sender`. + /// + /// On startup, this is: + /// - the finalized tip, if there are stored blocks, or + /// - the genesis block's parent hash, if the database is empty. + /// + /// If `invalid_block_reset_receiver` gets a reset, this is: + /// - the hash of the last valid committed block (the parent of the invalid block). + // + // TODO: + // - turn this into an IndexMap containing recent non-finalized block hashes and heights + // (they are all potential tips) + // - remove block hashes once their heights are strictly less than the finalized tip + last_block_hash_sent: block::Hash, + + /// If an invalid block is sent on `finalized_block_write_sender` + /// or `non_finalized_block_write_sender`, + /// this channel gets the [`block::Hash`] of the valid tip. + // + // TODO: add tests for finalized and non-finalized resets (#2654) + invalid_block_reset_receiver: tokio::sync::mpsc::UnboundedReceiver, + // Pending UTXO Request Tracking // /// The set of outpoints with pending requests for their associated transparent::Output. @@ -134,15 +173,19 @@ pub(crate) struct StateService { /// Instant tracking the last time `pending_utxos` was pruned. last_prune: Instant, - // Concurrently Readable State + // Updating Concurrently Readable State // /// A sender channel used to update the current best chain tip for /// [`LatestChainTip`] and [`ChainTipChange`]. - chain_tip_sender: ChainTipSender, + // + // TODO: remove this copy of the chain tip sender, and get rid of the mutex in the block write task + chain_tip_sender: Arc>, /// A sender channel used to update the recent non-finalized state for the [`ReadStateService`]. non_finalized_state_sender: watch::Sender, + // Concurrently Readable State + // /// A cloneable [`ReadStateService`], used to answer concurrent read requests. /// /// TODO: move users of read [`Request`]s to [`ReadStateService`], and remove `read_service`. @@ -154,7 +197,9 @@ pub(crate) struct StateService { /// /// Set to `f64::NAN` if `queued_finalized_blocks` is empty, because grafana shows NaNs /// as a break in the graph. - max_queued_height: f64, + // + // TODO: add a similar metric for `queued_non_finalized_blocks` + max_queued_finalized_height: f64, } /// A read-only service for accessing Zebra's cached blockchain state. @@ -177,7 +222,7 @@ pub struct ReadStateService { // Shared Concurrently Readable State // - /// A watch channel for a recent [`NonFinalizedState`]. + /// A watch channel with a cached copy of the [`NonFinalizedState`]. /// /// This state is only updated between requests, /// so it might include some block data that is also on `disk`. @@ -191,6 +236,63 @@ pub struct ReadStateService { /// This chain is updated concurrently with requests, /// so it might include some block data that is also in `best_mem`. db: ZebraDb, + + /// A shared handle to a task that writes blocks to the [`NonFinalizedState`] or [`FinalizedState`], + /// once the queues have received all their parent blocks. + /// + /// Used to check for panics when writing blocks. + block_write_task: Option>>, +} + +impl Drop for StateService { + fn drop(&mut self) { + // The state service owns the state, tasks, and channels, + // so dropping it should shut down everything. + + // Close the channels (non-blocking) + // This makes the block write thread exit the next time it checks the channels. + // We want to do this here so we get any errors or panics from the block write task before it shuts down. + self.invalid_block_reset_receiver.close(); + + std::mem::drop(self.finalized_block_write_sender.take()); + std::mem::drop(self.non_finalized_block_write_sender.take()); + + self.clear_finalized_block_queue("dropping the state: dropped unused queued block"); + + // Then drop self.read_service, which checks the block write task for panics, + // and tries to shut down the database. + } +} + +impl Drop for ReadStateService { + fn drop(&mut self) { + // The read state service shares the state, + // so dropping it should check if we can shut down. + + if let Some(block_write_task) = self.block_write_task.take() { + if let Ok(block_write_task_handle) = Arc::try_unwrap(block_write_task) { + // We're the last database user, so we can tell it to shut down (blocking): + // - flushes the database to disk, and + // - drops the database, which cleans up any database tasks correctly. + self.db.shutdown(true); + + // We are the last state with a reference to this thread, so we can + // wait until the block write task finishes, then check for panics (blocking). + // (We'd also like to abort the thread, but std::thread::JoinHandle can't do that.) + info!("waiting for the block write task to finish"); + if let Err(thread_panic) = block_write_task_handle.join() { + std::panic::resume_unwind(thread_panic); + } else { + info!("shutting down the state without waiting for the block write task"); + } + } + } else { + // Even if we're not the last database user, try shutting it down. + // + // TODO: rename this to try_shutdown()? + self.db.shutdown(false); + } + } } impl StateService { @@ -205,12 +307,12 @@ impl StateService { ) -> (Self, ReadStateService, LatestChainTip, ChainTipChange) { let timer = CodeTimer::start(); - let disk = FinalizedState::new(&config, network); + let finalized_state = FinalizedState::new(&config, network); timer.finish(module_path!(), line!(), "opening finalized state database"); let timer = CodeTimer::start(); - let initial_tip = disk - .db() + let initial_tip = finalized_state + .db .tip_block() .map(FinalizedBlock::from) .map(ChainTipBlock::from); @@ -219,26 +321,56 @@ impl StateService { let timer = CodeTimer::start(); let (chain_tip_sender, latest_chain_tip, chain_tip_change) = ChainTipSender::new(initial_tip, network); - - let mem = NonFinalizedState::new(network); - - let (read_service, non_finalized_state_sender) = ReadStateService::new(&disk); + let chain_tip_sender = Arc::new(Mutex::new(chain_tip_sender)); + + let non_finalized_state = NonFinalizedState::new(network); + + // Security: The number of blocks in these channels is limited by + // the syncer and inbound lookahead limits. + let (non_finalized_block_write_sender, non_finalized_block_write_receiver) = + tokio::sync::mpsc::unbounded_channel(); + let (finalized_block_write_sender, finalized_block_write_receiver) = + tokio::sync::mpsc::unbounded_channel(); + let (invalid_block_reset_sender, invalid_block_reset_receiver) = + tokio::sync::mpsc::unbounded_channel(); + + let finalized_state_for_writing = finalized_state.clone(); + let chain_tip_sender_for_writing = chain_tip_sender.clone(); + let block_write_task = std::thread::spawn(move || { + write::write_blocks_from_channels( + finalized_block_write_receiver, + non_finalized_block_write_receiver, + finalized_state_for_writing, + invalid_block_reset_sender, + chain_tip_sender_for_writing, + ) + }); + let block_write_task = Arc::new(block_write_task); + + let (read_service, non_finalized_state_sender) = + ReadStateService::new(&finalized_state, block_write_task); let queued_non_finalized_blocks = QueuedBlocks::default(); let pending_utxos = PendingUtxos::default(); + let last_block_hash_sent = finalized_state.db.finalized_tip_hash(); + let state = Self { network, queued_non_finalized_blocks, queued_finalized_blocks: HashMap::new(), - mem, - disk, + mem: non_finalized_state, + disk: finalized_state, + non_finalized_block_write_sender: Some(non_finalized_block_write_sender), + finalized_block_write_sender: Some(finalized_block_write_sender), + last_block_hash_sent, + invalid_block_reset_receiver, pending_utxos, last_prune: Instant::now(), chain_tip_sender, non_finalized_state_sender, read_service: read_service.clone(), - max_queued_height: f64::NAN, + max_queued_finalized_height: f64::NAN, }; timer.finish(module_path!(), line!(), "initializing state service"); @@ -256,7 +388,7 @@ impl StateService { state.network, MAX_LEGACY_CHAIN_BLOCKS, ) { - let legacy_db_path = state.disk.path().to_path_buf(); + let legacy_db_path = state.read_service.db.path().to_path_buf(); panic!( "Cached state contains a legacy chain.\n\ An outdated Zebra version did not know about a recent network upgrade,\n\ @@ -275,75 +407,147 @@ impl StateService { } /// Queue a finalized block for verification and storage in the finalized state. + /// + /// Returns a channel receiver that provides the result of the block commit. fn queue_and_commit_finalized( &mut self, finalized: FinalizedBlock, ) -> oneshot::Receiver> { + // # Correctness & Performance + // + // This method must not block, access the database, or perform CPU-intensive tasks, + // because it is called directly from the tokio executor's Future threads. + + let queued_prev_hash = finalized.block.header.previous_block_hash; + let queued_height = finalized.height; + let (rsp_tx, rsp_rx) = oneshot::channel(); + let queued = (finalized, rsp_tx); + + if self.finalized_block_write_sender.is_some() { + // We're still committing finalized blocks + if let Some(duplicate_queued) = self + .queued_finalized_blocks + .insert(queued_prev_hash, queued) + { + Self::send_finalized_block_error( + duplicate_queued, + "dropping older finalized block: got newer duplicate block", + ); + } - // TODO: move this code into the state block commit task: - // - queue_and_commit_finalized()'s commit_finalized() call becomes a send to the block commit channel - // - run commit_finalized() in the state block commit task - // - run the metrics update in queue_and_commit_finalized() in the block commit task - // - run the set_finalized_tip() in this function in the state block commit task - // - move all that code to the inner service - let tip_block = self - .drain_queue_and_commit_finalized((finalized, rsp_tx)) - .map(ChainTipBlock::from); + self.drain_queue_and_commit_finalized(); + } else { + // We've finished committing finalized blocks, so drop any repeated queued blocks, + // and return an error. + // + // TODO: track the latest sent height, and drop any blocks under that height + // every time we send some blocks (like QueuedNonFinalizedBlocks) + Self::send_finalized_block_error( + queued, + "already finished committing finalized blocks: dropped duplicate block, \ + block is already committed to the state", + ); + + self.clear_finalized_block_queue( + "already finished committing finalized blocks: dropped duplicate block, \ + block is already committed to the state", + ); + } + + if self.queued_finalized_blocks.is_empty() { + self.max_queued_finalized_height = f64::NAN; + } else if self.max_queued_finalized_height.is_nan() + || self.max_queued_finalized_height < queued_height.0 as f64 + { + // if there are still blocks in the queue, then either: + // - the new block was lower than the old maximum, and there was a gap before it, + // so the maximum is still the same (and we skip this code), or + // - the new block is higher than the old maximum, and there is at least one gap + // between the finalized tip and the new maximum + self.max_queued_finalized_height = queued_height.0 as f64; + } - self.chain_tip_sender.set_finalized_tip(tip_block); + metrics::gauge!( + "state.checkpoint.queued.max.height", + self.max_queued_finalized_height + ); + metrics::gauge!( + "state.checkpoint.queued.block.count", + self.queued_finalized_blocks.len() as f64, + ); rsp_rx } - /// Queue a finalized block to be committed to the state. + /// Finds queued finalized blocks to be committed to the state in order, + /// removes them from the queue, and sends them to the block commit task. /// /// After queueing a finalized block, this method checks whether the newly /// queued block (and any of its descendants) can be committed to the state. /// - /// Returns the highest finalized tip block committed from the queue, - /// or `None` if no blocks were committed in this call. - /// (Use `tip_block` to get the finalized tip, regardless of when it was committed.) - pub fn drain_queue_and_commit_finalized( - &mut self, - queued: QueuedFinalized, - ) -> Option { - let mut highest_queue_commit = None; - - let prev_hash = queued.0.block.header.previous_block_hash; - let height = queued.0.height; - self.queued_finalized_blocks.insert(prev_hash, queued); + /// Returns an error if the block commit channel has been closed. + pub fn drain_queue_and_commit_finalized(&mut self) { + use tokio::sync::mpsc::error::{SendError, TryRecvError}; + + // # Correctness & Performance + // + // This method must not block, access the database, or perform CPU-intensive tasks, + // because it is called directly from the tokio executor's Future threads. + + // If a block failed, we need to start again from a valid tip. + match self.invalid_block_reset_receiver.try_recv() { + Ok(reset_tip_hash) => self.last_block_hash_sent = reset_tip_hash, + Err(TryRecvError::Disconnected) => { + info!("Block commit task closed the block reset channel. Is Zebra shutting down?"); + return; + } + // There are no errors, so we can just use the last block hash we sent + Err(TryRecvError::Empty) => {} + } while let Some(queued_block) = self .queued_finalized_blocks - .remove(&self.disk.db().finalized_tip_hash()) + .remove(&self.last_block_hash_sent) { - if let Ok(finalized) = self.disk.commit_finalized(queued_block) { - highest_queue_commit = Some(finalized); - } else { - // the last block in the queue failed, so we can't commit the next block - break; + self.last_block_hash_sent = queued_block.0.hash; + + // If we've finished sending finalized blocks, ignore any repeated blocks. + // (Blocks can be repeated after a syncer reset.) + if let Some(finalized_block_write_sender) = &self.finalized_block_write_sender { + let send_result = finalized_block_write_sender.send(queued_block); + + // If the receiver is closed, we can't send any more blocks. + if let Err(SendError(queued)) = send_result { + // If Zebra is shutting down, drop blocks and return an error. + Self::send_finalized_block_error( + queued, + "block commit task exited. Is Zebra shutting down?", + ); + + self.clear_finalized_block_queue( + "block commit task exited. Is Zebra shutting down?", + ); + }; } } + } - if self.queued_finalized_blocks.is_empty() { - self.max_queued_height = f64::NAN; - } else if self.max_queued_height.is_nan() || self.max_queued_height < height.0 as f64 { - // if there are still blocks in the queue, then either: - // - the new block was lower than the old maximum, and there was a gap before it, - // so the maximum is still the same (and we skip this code), or - // - the new block is higher than the old maximum, and there is at least one gap - // between the finalized tip and the new maximum - self.max_queued_height = height.0 as f64; + /// Drops all queued finalized blocks, and sends an error on their result channels. + fn clear_finalized_block_queue(&mut self, error: impl Into + Clone) { + for (_hash, queued) in self.queued_finalized_blocks.drain() { + Self::send_finalized_block_error(queued, error.clone()); } + } - metrics::gauge!("state.checkpoint.queued.max.height", self.max_queued_height); - metrics::gauge!( - "state.checkpoint.queued.block.count", - self.queued_finalized_blocks.len() as f64, - ); + /// Send an error on a `QueuedFinalized` block's result channel, and drop the block + fn send_finalized_block_error(queued: QueuedFinalized, error: impl Into) { + let (finalized, rsp_tx) = queued; - highest_queue_commit + // The block sender might have already given up on this block, + // so ignore any channel send errors. + let _ = rsp_tx.send(Err(error.into())); + std::mem::drop(finalized); } /// Queue a non finalized block for verification and check if any queued @@ -362,7 +566,7 @@ impl StateService { let parent_hash = prepared.block.header.previous_block_hash; if self.mem.any_chain_contains(&prepared.hash) - || self.disk.db().hash(prepared.height).is_some() + || self.read_service.db.hash(prepared.height).is_some() { let (rsp_tx, rsp_rx) = oneshot::channel(); let _ = rsp_tx.send(Err("block is already committed to the state".into())); @@ -386,6 +590,31 @@ impl StateService { rsp_rx }; + // We've finished sending finalized blocks when: + // - we've sent the finalized block for the last checkpoint, and + // - it has been successfully written to disk. + // + // We detect the last checkpoint by looking for non-finalized blocks + // that are a child of the last block we sent. + // + // TODO: configure the state with the last checkpoint hash instead? + if self.finalized_block_write_sender.is_some() + && self + .queued_non_finalized_blocks + .has_queued_children(self.last_block_hash_sent) + && self.read_service.db.finalized_tip_hash() == self.last_block_hash_sent + { + // Tell the block write task to stop committing finalized blocks, + // and move on to committing non-finalized blocks. + std::mem::drop(self.finalized_block_write_sender.take()); + + // We've finished committing finalized blocks, so drop any repeated queued blocks. + self.clear_finalized_block_queue( + "already finished committing finalized blocks: dropped duplicate block, \ + block is already committed to the state", + ); + } + // TODO: avoid a temporary verification failure that can happen // if the first non-finalized block arrives before the last finalized block is committed // (#5125) @@ -411,7 +640,7 @@ impl StateService { ); } - let finalized_tip_height = self.disk.db().finalized_tip_height().expect( + let finalized_tip_height = self.read_service.db.finalized_tip_height().expect( "Finalized state must have at least one block before committing non-finalized state", ); self.queued_non_finalized_blocks @@ -447,6 +676,9 @@ impl StateService { /// non-finalized state is empty. /// /// [1]: non_finalized_state::Chain + // + // TODO: remove this clippy allow when we remove self.chain_tip_sender + #[allow(clippy::unwrap_in_result)] #[instrument(level = "debug", skip(self))] fn update_latest_chain_channels(&mut self) -> Option { let best_chain = self.mem.best_chain(); @@ -459,7 +691,10 @@ impl StateService { // If the final receiver was just dropped, ignore the error. let _ = self.non_finalized_state_sender.send(self.mem.clone()); - self.chain_tip_sender.set_best_non_finalized_tip(tip_block); + self.chain_tip_sender + .lock() + .expect("unexpected panic in block commit task or state") + .set_best_non_finalized_tip(tip_block); tip_block_height } @@ -471,10 +706,10 @@ impl StateService { self.check_contextual_validity(&prepared)?; let parent_hash = prepared.block.header.previous_block_hash; - if self.disk.db().finalized_tip_hash() == parent_hash { - self.mem.commit_new_chain(prepared, self.disk.db())?; + if self.disk.db.finalized_tip_hash() == parent_hash { + self.mem.commit_new_chain(prepared, &self.disk.db)?; } else { - self.mem.commit_block(prepared, self.disk.db())?; + self.mem.commit_block(prepared, &self.disk.db)?; } Ok(()) @@ -482,7 +717,7 @@ impl StateService { /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks. fn can_fork_chain_at(&self, hash: &block::Hash) -> bool { - self.mem.any_chain_contains(hash) || &self.disk.db().finalized_tip_hash() == hash + self.mem.any_chain_contains(hash) || &self.read_service.db.finalized_tip_hash() == hash } /// Attempt to validate and commit all queued blocks whose parents have @@ -547,25 +782,25 @@ impl StateService { check::block_is_valid_for_recent_chain( prepared, self.network, - self.disk.db().finalized_tip_height(), + self.disk.db.finalized_tip_height(), relevant_chain, )?; - check::nullifier::no_duplicates_in_finalized_chain(prepared, self.disk.db())?; + check::nullifier::no_duplicates_in_finalized_chain(prepared, &self.disk.db)?; Ok(()) } /// Return the tip of the current best chain. pub fn best_tip(&self) -> Option<(block::Height, block::Hash)> { - self.mem.best_tip().or_else(|| self.disk.db().tip()) + self.mem.best_tip().or_else(|| self.read_service.db.tip()) } /// Return the height for the block at `hash` in any chain. pub fn any_height_by_hash(&self, hash: block::Hash) -> Option { self.mem .any_height_by_hash(hash) - .or_else(|| self.disk.db().height(hash)) + .or_else(|| self.read_service.db.height(hash)) } /// Return an iterator over the relevant chain of the block identified by @@ -593,18 +828,23 @@ impl StateService { } impl ReadStateService { - /// Creates a new read-only state service, using the provided finalized state. + /// Creates a new read-only state service, using the provided finalized state and + /// block write task handle. /// /// Returns the newly created service, /// and a watch channel for updating the shared recent non-finalized chain. - pub(crate) fn new(disk: &FinalizedState) -> (Self, watch::Sender) { + pub(crate) fn new( + finalized_state: &FinalizedState, + block_write_task: Arc>, + ) -> (Self, watch::Sender) { let (non_finalized_state_sender, non_finalized_state_receiver) = - watch::channel(NonFinalizedState::new(disk.network())); + watch::channel(NonFinalizedState::new(finalized_state.network())); let read_service = Self { - network: disk.network(), - db: disk.db().clone(), + network: finalized_state.network(), + db: finalized_state.db.clone(), non_finalized_state_receiver: WatchReceiver::new(non_finalized_state_receiver), + block_write_task: Some(block_write_task), }; tracing::info!("created new read-only state service"); @@ -619,7 +859,11 @@ impl Service for StateService { type Future = Pin> + Send + 'static>>; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // Check for panics in the block write task + let poll = self.read_service.poll_ready(cx); + + // Prune outdated UTXO requests let now = Instant::now(); if self.last_prune + Self::PRUNE_INTERVAL < now { @@ -646,7 +890,7 @@ impl Service for StateService { } } - Poll::Ready(Ok(())) + poll } #[instrument(name = "state", skip(self, req))] @@ -679,6 +923,10 @@ impl Service for StateService { span.in_scope(|| self.queue_and_commit_non_finalized(prepared)) }); + // TODO: + // - check for panics in the block write task here, + // as well as in poll_ready() + // The work is all done, the future just waits on a channel for the result timer.finish(module_path!(), line!(), "CommitBlock"); @@ -700,7 +948,7 @@ impl Service for StateService { } // Uses queued_finalized_blocks and pending_utxos in the StateService. - // Accesses shared writeable state in the StateService and ZebraDb. + // Accesses shared writeable state in the StateService. Request::CommitFinalizedBlock(finalized) => { let timer = CodeTimer::start(); @@ -716,14 +964,13 @@ impl Service for StateService { // # Performance // - // Allow other async tasks to make progress while blocks are being verified - // and written to disk. - // - // See the note in `CommitBlock` for more details. - let span = Span::current(); - let rsp_rx = tokio::task::block_in_place(move || { - span.in_scope(|| self.queue_and_commit_finalized(finalized)) - }); + // This method doesn't block, access the database, or perform CPU-intensive tasks, + // so we can run it directly in the tokio executor's Future threads. + let rsp_rx = self.queue_and_commit_finalized(finalized); + + // TODO: + // - check for panics in the block write task here, + // as well as in poll_ready() // The work is all done, the future just waits on a channel for the result timer.finish(module_path!(), line!(), "CommitFinalizedBlock"); @@ -847,6 +1094,27 @@ impl Service for ReadStateService { Pin> + Send + 'static>>; fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + // Check for panics in the block write task + let block_write_task = self.block_write_task.take(); + + if let Some(block_write_task) = block_write_task { + if block_write_task.is_finished() { + match Arc::try_unwrap(block_write_task) { + // We are the last state with a reference to this task, so we can propagate any panics + Ok(block_write_task_handle) => { + if let Err(thread_panic) = block_write_task_handle.join() { + std::panic::resume_unwind(thread_panic); + } + } + // We're not the last state, so we need to put it back + Err(arc_block_write_task) => self.block_write_task = Some(arc_block_write_task), + } + } else { + // It hasn't finished, so we need to put it back + self.block_write_task = Some(block_write_task); + } + } + Poll::Ready(Ok(())) } diff --git a/zebra-state/src/service/arbitrary.rs b/zebra-state/src/service/arbitrary.rs index ed723321674..7b0d6195f72 100644 --- a/zebra-state/src/service/arbitrary.rs +++ b/zebra-state/src/service/arbitrary.rs @@ -1,6 +1,6 @@ //! Arbitrary data generation and test setup for Zebra's state. -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use futures::{stream::FuturesUnordered, StreamExt}; use proptest::{ @@ -9,11 +9,12 @@ use proptest::{ strategy::{NewTree, ValueTree}, test_runner::TestRunner, }; +use tokio::time::timeout; use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; use zebra_chain::{ block::Block, - fmt::SummaryDebug, + fmt::{humantime_seconds, SummaryDebug}, history_tree::HistoryTree, parameters::{Network, NetworkUpgrade}, LedgerState, @@ -27,6 +28,9 @@ use crate::{ pub use zebra_chain::block::arbitrary::MAX_PARTIAL_CHAIN_BLOCKS; +/// How long we wait for chain tip updates before skipping them. +pub const CHAIN_TIP_UPDATE_WAIT_LIMIT: Duration = Duration::from_secs(2); + #[derive(Debug)] pub struct PreparedChainTree { chain: Arc>>, @@ -197,7 +201,7 @@ pub async fn populated_state( .into_iter() .map(|block| Request::CommitFinalizedBlock(block.into())); - let (state, read_state, latest_chain_tip, chain_tip_change) = + let (state, read_state, latest_chain_tip, mut chain_tip_change) = StateService::new(Config::ephemeral(), network); let mut state = Buffer::new(BoxService::new(state), 1); @@ -209,7 +213,24 @@ pub async fn populated_state( } while let Some(rsp) = responses.next().await { - rsp.expect("blocks should commit just fine"); + // Wait for the block result and the chain tip update, + // which both happen in a separate thread from this one. + rsp.expect("unexpected block commit failure"); + + // Wait for the chain tip update + if let Err(timeout_error) = timeout( + CHAIN_TIP_UPDATE_WAIT_LIMIT, + chain_tip_change.wait_for_tip_change(), + ) + .await + .map(|change_result| change_result.expect("unexpected chain tip update failure")) + { + info!( + timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT), + ?timeout_error, + "timeout waiting for chain tip change after committing block" + ); + } } (state, read_state, latest_chain_tip, chain_tip_change) diff --git a/zebra-state/src/service/block_iter.rs b/zebra-state/src/service/block_iter.rs index 73da14bf93b..90ea29d2b38 100644 --- a/zebra-state/src/service/block_iter.rs +++ b/zebra-state/src/service/block_iter.rs @@ -49,7 +49,7 @@ impl Iter<'_> { IterState::Finished => unreachable!(), }; - if let Some(block) = service.disk.db().block(hash_or_height) { + if let Some(block) = service.read_service.db.block(hash_or_height) { let height = block .coinbase_height() .expect("valid blocks have a coinbase height"); diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 5b6cae48911..515074b1909 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -17,7 +17,6 @@ use std::{ io::{stderr, stdout, Write}, - path::Path, sync::Arc, }; @@ -46,8 +45,11 @@ pub(super) use zebra_db::ZebraDb; /// The finalized part of the chain state, stored in the db. /// /// `rocksdb` allows concurrent writes through a shared reference, -/// so finalized state instances are cloneable. When the final clone is dropped, -/// the database is closed. +/// so clones of the finalized state represent the same database instance. +/// When the final clone is dropped, the database is closed. +/// +/// This is different from `NonFinalizedState::clone()`, +/// which returns an independent copy of the chains. #[derive(Clone, Debug, Eq, PartialEq)] pub struct FinalizedState { // Configuration @@ -72,7 +74,7 @@ pub struct FinalizedState { /// `rocksdb` allows reads and writes via a shared reference, /// so this database object can be freely cloned. /// The last instance that is dropped will close the underlying database. - db: ZebraDb, + pub db: ZebraDb, } impl FinalizedState { @@ -134,29 +136,19 @@ impl FinalizedState { self.network } - /// Returns the `Path` where the files used by this database are located. - pub fn path(&self) -> &Path { - self.db.path() - } - - /// Returns a reference to the inner database instance. - pub(crate) fn db(&self) -> &ZebraDb { - &self.db - } - /// Commit a finalized block to the state. /// /// It's the caller's responsibility to ensure that blocks are committed in /// order. pub fn commit_finalized( &mut self, - queued_block: QueuedFinalized, - ) -> Result { - let (finalized, rsp_tx) = queued_block; + ordered_block: QueuedFinalized, + ) -> Result { + let (finalized, rsp_tx) = ordered_block; let result = self.commit_finalized_direct(finalized.clone().into(), "CommitFinalized request"); - let block_result = if result.is_ok() { + if result.is_ok() { metrics::counter!("state.checkpoint.finalized.block.count", 1); metrics::gauge!( "state.checkpoint.finalized.block.height", @@ -171,21 +163,23 @@ impl FinalizedState { finalized.height.0 as f64, ); metrics::counter!("zcash.chain.verified.block.total", 1); - - Ok(finalized) } else { metrics::counter!("state.checkpoint.error.block.count", 1); metrics::gauge!( "state.checkpoint.error.block.height", finalized.height.0 as f64, ); - - Err(()) }; - let _ = rsp_tx.send(result.map_err(Into::into)); + // Some io errors can't be cloned, so we format them instead. + let owned_result = result + .as_ref() + .map(|_hash| finalized) + .map_err(|error| format!("{:?}", error).into()); + + let _ = rsp_tx.send(result); - block_result + owned_result } /// Immediately commit a `finalized` block to the finalized state. diff --git a/zebra-state/src/service/finalized_state/arbitrary.rs b/zebra-state/src/service/finalized_state/arbitrary.rs index 9ff59c7506e..516898b3794 100644 --- a/zebra-state/src/service/finalized_state/arbitrary.rs +++ b/zebra-state/src/service/finalized_state/arbitrary.rs @@ -14,7 +14,7 @@ impl Deref for FinalizedState { type Target = ZebraDb; fn deref(&self) -> &Self::Target { - self.db() + &self.db } } diff --git a/zebra-state/src/service/finalized_state/disk_db.rs b/zebra-state/src/service/finalized_state/disk_db.rs index d183b4159be..cc4e3317588 100644 --- a/zebra-state/src/service/finalized_state/disk_db.rs +++ b/zebra-state/src/service/finalized_state/disk_db.rs @@ -643,23 +643,49 @@ impl DiskDb { /// It should only be used in debugging or test code, immediately before a manual shutdown. /// /// TODO: make private after the stop height check has moved to the syncer (#3442) - /// move shutting down the database to a blocking thread (#2188), - /// and remove `force` and the manual flush + /// move shutting down the database to a blocking thread (#2188) pub(crate) fn shutdown(&mut self, force: bool) { - // Prevent a race condition where another thread clones the Arc, - // right after we've checked we're the only holder of the Arc. + // # Correctness // - // There is still a small race window after the guard is dropped, - // but if the race happens, it will only cause database errors during shutdown. - let clone_prevention_guard = Arc::get_mut(&mut self.db); + // If we're the only owner of the shared database instance, + // then there are no other threads that can increase the strong or weak count. + // + // ## Implementation Requirements + // + // This function and all functions that it calls should avoid cloning the shared database + // instance. If they do, they must drop it before: + // - shutting down database threads, or + // - deleting database files. + let shared_database_owners = Arc::strong_count(&self.db) + Arc::weak_count(&self.db); - if clone_prevention_guard.is_none() && !force { - debug!( - "dropping cloned DiskDb, \ - but keeping shared database until the last reference is dropped", - ); + if shared_database_owners > 1 { + let path = self.path(); - return; + let mut ephemeral_note = ""; + + if force { + if self.ephemeral { + ephemeral_note = " and removing ephemeral files"; + } + + info!( + ?path, + "forcing shutdown{} of a state database with multiple active instances", + ephemeral_note, + ); + } else { + if self.ephemeral { + ephemeral_note = " and files"; + } + + debug!( + ?path, + "dropping DiskDb clone, \ + but keeping shared database instance{} until the last reference is dropped", + ephemeral_note, + ); + return; + } } self.assert_default_cf_is_empty(); @@ -670,17 +696,29 @@ impl DiskDb { // - the database flushes regularly anyway // - Zebra commits each block in a database transaction, any incomplete blocks get rolled back // - ephemeral files are placed in the os temp dir and should be cleaned up automatically eventually - info!("flushing database to disk"); - self.db.flush().expect("flush is successful"); + let path = self.path(); + info!(?path, "flushing database to disk"); + self.db + .flush() + .expect("unexpected failure flushing SST data to disk"); + self.db + .flush_wal(true) + .expect("unexpected failure flushing WAL data to disk"); - // But we should call `cancel_all_background_work` before Zebra exits. - // If we don't, we see these kinds of errors: + // We'd like to call `cancel_all_background_work()` before Zebra exits, + // but when we call it, we get memory, thread, or C++ errors when the process exits. + // (This seems to be a bug in RocksDB: cancel_all_background_work() should wait until + // all the threads have cleaned up.) + // + // We see these kinds of errors: // ``` // pthread lock: Invalid argument // pure virtual method called // terminate called without an active exception // pthread destroy mutex: Device or resource busy // Aborted (core dumped) + // signal: 6, SIGABRT: process abort signal + // signal: 11, SIGSEGV: invalid memory reference // ``` // // The RocksDB wiki says: @@ -690,8 +728,8 @@ impl DiskDb { // > You can speed up the waiting by calling CancelAllBackgroundWork(). // // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ - info!("stopping background database tasks"); - self.db.cancel_all_background_work(true); + //info!(?path, "stopping background database tasks"); + //self.db.cancel_all_background_work(true); // We'd like to drop the database before deleting its files, // because that closes the column families and the database correctly. @@ -705,57 +743,52 @@ impl DiskDb { // // https://github.com/facebook/rocksdb/wiki/Known-Issues // - // But our current code doesn't seem to cause any issues. - // We might want to explicitly drop the database as part of graceful shutdown (#1678). - self.delete_ephemeral(force); + // But this implementation doesn't seem to cause any issues, + // and the RocksDB Drop implementation handles any cleanup. + self.delete_ephemeral(); } - /// If the database is `ephemeral`, delete it. - /// - /// If `force` is true, clean up regardless of any shared references. - /// `force` can cause errors accessing the database from other shared references. - /// It should only be used in debugging or test code, immediately before a manual shutdown. - fn delete_ephemeral(&mut self, force: bool) { - if !self.ephemeral { - return; - } - - // Prevent a race condition where another thread clones the Arc, - // right after we've checked we're the only holder of the Arc. + /// If the database is `ephemeral`, delete its files. + fn delete_ephemeral(&mut self) { + // # Correctness // - // There is still a small race window after the guard is dropped, - // but if the race happens, it will only cause database errors during shutdown. - let clone_prevention_guard = Arc::get_mut(&mut self.db); - - if clone_prevention_guard.is_none() && !force { - debug!( - "dropping cloned DiskDb, \ - but keeping shared database files until the last reference is dropped", - ); + // This function and all functions that it calls should avoid cloning the shared database + // instance. See `shutdown()` for details. + if !self.ephemeral { return; } let path = self.path(); - info!(cache_path = ?path, "removing temporary database files"); + info!(?path, "removing temporary database files"); // We'd like to use `rocksdb::Env::mem_env` for ephemeral databases, // but the Zcash blockchain might not fit in memory. So we just // delete the database files instead. // - // We'd like to call `DB::destroy` here, but calling destroy on a + // We'd also like to call `DB::destroy` here, but calling destroy on a // live DB is undefined behaviour: // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ#basic-readwrite // // So we assume that all the database files are under `path`, and // delete them using standard filesystem APIs. Deleting open files // might cause errors on non-Unix platforms, so we ignore the result. - // (The OS will delete them eventually anyway.) - let res = std::fs::remove_dir_all(path); + // (The OS will delete them eventually anyway, if they are in a temporary directory.) + let result = std::fs::remove_dir_all(path); - // TODO: downgrade to debug once bugs like #2905 are fixed - // but leave any errors at "info" level - info!(?res, "removed temporary database files"); + if result.is_err() { + info!( + ?result, + ?path, + "removing temporary database files caused an error", + ); + } else { + debug!( + ?result, + ?path, + "successfully removed temporary database files", + ); + } } /// Check that the "default" column family is empty. @@ -764,6 +797,11 @@ impl DiskDb { /// /// If Zebra has a bug where it is storing data in the wrong column family. fn assert_default_cf_is_empty(&self) { + // # Correctness + // + // This function and all functions that it calls should avoid cloning the shared database + // instance. See `shutdown()` for details. + if let Some(default_cf) = self.cf_handle("default") { assert!( self.zs_is_empty(&default_cf), @@ -775,6 +813,9 @@ impl DiskDb { impl Drop for DiskDb { fn drop(&mut self) { + let path = self.path(); + debug!(?path, "dropping DiskDb instance"); + self.shutdown(false); } } diff --git a/zebra-state/src/service/non_finalized_state.rs b/zebra-state/src/service/non_finalized_state.rs index 0e34e23414a..fe887799304 100644 --- a/zebra-state/src/service/non_finalized_state.rs +++ b/zebra-state/src/service/non_finalized_state.rs @@ -30,7 +30,13 @@ mod tests; pub(crate) use chain::Chain; /// The state of the chains in memory, including queued blocks. -#[derive(Debug, Clone)] +/// +/// Clones of the non-finalized state contain independent copies of the chains. +/// This is different from `FinalizedState::clone()`, +/// which returns a shared reference to the database. +/// +/// Most chain data is clone-on-write using [`Arc`]. +#[derive(Clone, Debug)] pub struct NonFinalizedState { /// Verified, non-finalized chains, in ascending order. /// diff --git a/zebra-state/src/service/pending_utxos.rs b/zebra-state/src/service/pending_utxos.rs index 3742a721b5a..953aba4a97c 100644 --- a/zebra-state/src/service/pending_utxos.rs +++ b/zebra-state/src/service/pending_utxos.rs @@ -1,5 +1,6 @@ -use std::collections::HashMap; -use std::future::Future; +//! Pending UTXO tracker for [`AwaitUtxo` requests](crate::Request::AwaitUtxo). + +use std::{collections::HashMap, future::Future}; use tokio::sync::broadcast; diff --git a/zebra-state/src/service/queued_blocks.rs b/zebra-state/src/service/queued_blocks.rs index ac0270c9276..4cfa12d89ec 100644 --- a/zebra-state/src/service/queued_blocks.rs +++ b/zebra-state/src/service/queued_blocks.rs @@ -77,6 +77,12 @@ impl QueuedBlocks { self.update_metrics(); } + /// Returns `true` if there are any queued children of `parent_hash`. + #[instrument(skip(self), fields(%parent_hash))] + pub fn has_queued_children(&self, parent_hash: block::Hash) -> bool { + self.by_parent.contains_key(&parent_hash) + } + /// Dequeue and return all blocks that were waiting for the arrival of /// `parent`. #[instrument(skip(self), fields(%parent_hash))] diff --git a/zebra-state/src/service/read/address/balance.rs b/zebra-state/src/service/read/address/balance.rs index a6a7c7facbf..cff2cf2255b 100644 --- a/zebra-state/src/service/read/address/balance.rs +++ b/zebra-state/src/service/read/address/balance.rs @@ -2,8 +2,9 @@ //! //! In the functions in this module: //! -//! The StateService commits blocks to the finalized state before updating -//! `chain` from the latest chain. Then it can commit additional blocks to +//! The block write task commits blocks to the finalized state before updating +//! `chain` with a cached copy of the best non-finalized chain from +//! `NonFinalizedState.chain_set`. Then the block commit task can commit additional blocks to //! the finalized state after we've cloned the `chain`. //! //! This means that some blocks can be in both: diff --git a/zebra-state/src/service/read/address/tx_id.rs b/zebra-state/src/service/read/address/tx_id.rs index 86d055eafd4..f35bb39104f 100644 --- a/zebra-state/src/service/read/address/tx_id.rs +++ b/zebra-state/src/service/read/address/tx_id.rs @@ -2,8 +2,9 @@ //! //! In the functions in this module: //! -//! The StateService commits blocks to the finalized state before updating -//! `chain` from the latest chain. Then it can commit additional blocks to +//! The block write task commits blocks to the finalized state before updating +//! `chain` with a cached copy of the best non-finalized chain from +//! `NonFinalizedState.chain_set`. Then the block commit task can commit additional blocks to //! the finalized state after we've cloned the `chain`. //! //! This means that some blocks can be in both: diff --git a/zebra-state/src/service/read/address/utxo.rs b/zebra-state/src/service/read/address/utxo.rs index 886b3df6c87..2e12213ddb2 100644 --- a/zebra-state/src/service/read/address/utxo.rs +++ b/zebra-state/src/service/read/address/utxo.rs @@ -2,8 +2,9 @@ //! //! In the functions in this module: //! -//! The StateService commits blocks to the finalized state before updating -//! `chain` from the latest chain. Then it can commit additional blocks to +//! The block write task commits blocks to the finalized state before updating +//! `chain` with a cached copy of the best non-finalized chain from +//! `NonFinalizedState.chain_set`. Then the block commit task can commit additional blocks to //! the finalized state after we've cloned the `chain`. //! //! This means that some blocks can be in both: diff --git a/zebra-state/src/service/read/block.rs b/zebra-state/src/service/read/block.rs index 942fe40f4fa..985c44eaf94 100644 --- a/zebra-state/src/service/read/block.rs +++ b/zebra-state/src/service/read/block.rs @@ -2,8 +2,9 @@ //! //! In the functions in this module: //! -//! The StateService commits blocks to the finalized state before updating -//! `chain` or `non_finalized_state` from the latest chains. Then it can +//! The block write task commits blocks to the finalized state before updating +//! `chain` or `non_finalized_state` with a cached copy of the non-finalized chains +//! in `NonFinalizedState.chain_set`. Then the block commit task can //! commit additional blocks to the finalized state after we've cloned the //! `chain` or `non_finalized_state`. //! diff --git a/zebra-state/src/service/read/find.rs b/zebra-state/src/service/read/find.rs index 2459500d279..50c7bf7d1a4 100644 --- a/zebra-state/src/service/read/find.rs +++ b/zebra-state/src/service/read/find.rs @@ -2,8 +2,9 @@ //! //! In the functions in this module: //! -//! The StateService commits blocks to the finalized state before updating -//! `chain` from the latest chain. Then it can commit additional blocks to +//! The block write task commits blocks to the finalized state before updating +//! `chain` with a cached copy of the best non-finalized chain from +//! `NonFinalizedState.chain_set`. Then the block commit task can commit additional blocks to //! the finalized state after we've cloned the `chain`. //! //! This means that some blocks can be in both: diff --git a/zebra-state/src/service/read/tree.rs b/zebra-state/src/service/read/tree.rs index 7c3e69da497..09897a93782 100644 --- a/zebra-state/src/service/read/tree.rs +++ b/zebra-state/src/service/read/tree.rs @@ -2,8 +2,9 @@ //! //! In the functions in this module: //! -//! The StateService commits blocks to the finalized state before updating -//! `chain` from the latest chain. Then it can commit additional blocks to +//! The block write task commits blocks to the finalized state before updating +//! `chain` with a cached copy of the best non-finalized chain from +//! `NonFinalizedState.chain_set`. Then the block commit task can commit additional blocks to //! the finalized state after we've cloned the `chain`. //! //! This means that some blocks can be in both: diff --git a/zebra-state/src/service/tests.rs b/zebra-state/src/service/tests.rs index b0987b48b43..fbb7b23632c 100644 --- a/zebra-state/src/service/tests.rs +++ b/zebra-state/src/service/tests.rs @@ -2,7 +2,7 @@ //! //! TODO: move these tests into tests::vectors and tests::prop modules. -use std::{env, sync::Arc}; +use std::{env, sync::Arc, time::Duration}; use tower::{buffer::Buffer, util::BoxService}; @@ -386,59 +386,6 @@ proptest! { prop_assert_eq!(response, Ok(())); } - /// Test that the best tip height is updated accordingly. - /// - /// 1. Generate a finalized chain and some non-finalized blocks. - /// 2. Check that initially the best tip height is empty. - /// 3. Commit the finalized blocks and check that the best tip height is updated accordingly. - /// 4. Commit the non-finalized blocks and check that the best tip height is also updated - /// accordingly. - #[test] - fn chain_tip_sender_is_updated( - (network, finalized_blocks, non_finalized_blocks) - in continuous_empty_blocks_from_test_vectors(), - ) { - let _init_guard = zebra_test::init(); - - let (mut state_service, _read_only_state_service, latest_chain_tip, mut chain_tip_change) = StateService::new(Config::ephemeral(), network); - - prop_assert_eq!(latest_chain_tip.best_tip_height(), None); - prop_assert_eq!(chain_tip_change.last_tip_change(), None); - - for block in finalized_blocks { - let expected_block = block.clone(); - - let expected_action = if expected_block.height <= block::Height(1) { - // 0: reset by both initialization and the Genesis network upgrade - // 1: reset by the BeforeOverwinter network upgrade - TipAction::reset_with(expected_block.clone().into()) - } else { - TipAction::grow_with(expected_block.clone().into()) - }; - - state_service.queue_and_commit_finalized(block); - - prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height)); - prop_assert_eq!(chain_tip_change.last_tip_change(), Some(expected_action)); - } - - for block in non_finalized_blocks { - let expected_block = block.clone(); - - let expected_action = if expected_block.height == block::Height(1) { - // 1: reset by the BeforeOverwinter network upgrade - TipAction::reset_with(expected_block.clone().into()) - } else { - TipAction::grow_with(expected_block.clone().into()) - }; - - state_service.queue_and_commit_non_finalized(block); - - prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height)); - prop_assert_eq!(chain_tip_change.last_tip_change(), Some(expected_action)); - } - } - /// Test that the value pool is updated accordingly. /// /// 1. Generate a finalized chain and some non-finalized blocks. @@ -476,7 +423,10 @@ proptest! { expected_finalized_value_pool += *block_value_pool; } - state_service.queue_and_commit_finalized(block.clone()); + let result_receiver = state_service.queue_and_commit_finalized(block.clone()); + let result = result_receiver.blocking_recv(); + + prop_assert!(result.is_ok(), "unexpected failed finalized block commit: {:?}", result); prop_assert_eq!( state_service.disk.finalized_value_pool(), @@ -499,7 +449,10 @@ proptest! { let block_value_pool = &block.block.chain_value_pool_change(&transparent::utxos_from_ordered_utxos(utxos))?; expected_non_finalized_value_pool += *block_value_pool; - state_service.queue_and_commit_non_finalized(block.clone()); + let result_receiver = state_service.queue_and_commit_non_finalized(block.clone()); + let result = result_receiver.blocking_recv(); + + prop_assert!(result.is_ok(), "unexpected failed non-finalized block commit: {:?}", result); prop_assert_eq!( state_service.mem.best_chain().unwrap().chain_value_pools, @@ -518,6 +471,80 @@ proptest! { } } +// This test sleeps for every block, so we only ever want to run it once +proptest! { + #![proptest_config( + proptest::test_runner::Config::with_cases(1) + )] + + /// Test that the best tip height is updated accordingly. + /// + /// 1. Generate a finalized chain and some non-finalized blocks. + /// 2. Check that initially the best tip height is empty. + /// 3. Commit the finalized blocks and check that the best tip height is updated accordingly. + /// 4. Commit the non-finalized blocks and check that the best tip height is also updated + /// accordingly. + #[test] + fn chain_tip_sender_is_updated( + (network, finalized_blocks, non_finalized_blocks) + in continuous_empty_blocks_from_test_vectors(), + ) { + let _init_guard = zebra_test::init(); + + let (mut state_service, _read_only_state_service, latest_chain_tip, mut chain_tip_change) = StateService::new(Config::ephemeral(), network); + + prop_assert_eq!(latest_chain_tip.best_tip_height(), None); + prop_assert_eq!(chain_tip_change.last_tip_change(), None); + + for block in finalized_blocks { + let expected_block = block.clone(); + + let expected_action = if expected_block.height <= block::Height(1) { + // 0: reset by both initialization and the Genesis network upgrade + // 1: reset by the BeforeOverwinter network upgrade + TipAction::reset_with(expected_block.clone().into()) + } else { + TipAction::grow_with(expected_block.clone().into()) + }; + + let result_receiver = state_service.queue_and_commit_finalized(block); + let result = result_receiver.blocking_recv(); + + prop_assert!(result.is_ok(), "unexpected failed finalized block commit: {:?}", result); + + // Wait for the channels to be updated by the block commit task. + // TODO: add a blocking method on ChainTipChange + std::thread::sleep(Duration::from_secs(1)); + + prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height)); + prop_assert_eq!(chain_tip_change.last_tip_change(), Some(expected_action)); + } + + for block in non_finalized_blocks { + let expected_block = block.clone(); + + let expected_action = if expected_block.height == block::Height(1) { + // 1: reset by the BeforeOverwinter network upgrade + TipAction::reset_with(expected_block.clone().into()) + } else { + TipAction::grow_with(expected_block.clone().into()) + }; + + let result_receiver = state_service.queue_and_commit_non_finalized(block); + let result = result_receiver.blocking_recv(); + + prop_assert!(result.is_ok(), "unexpected failed non-finalized block commit: {:?}", result); + + // Wait for the channels to be updated by the block commit task. + // TODO: add a blocking method on ChainTipChange + std::thread::sleep(Duration::from_secs(1)); + + prop_assert_eq!(latest_chain_tip.best_tip_height(), Some(expected_block.height)); + prop_assert_eq!(chain_tip_change.last_tip_change(), Some(expected_action)); + } + } +} + /// Test strategy to generate a chain split in two from the test vectors. /// /// Selects either the mainnet or testnet chain test vector and randomly splits the chain in two diff --git a/zebra-state/src/service/write.rs b/zebra-state/src/service/write.rs new file mode 100644 index 00000000000..f3aa6e86f12 --- /dev/null +++ b/zebra-state/src/service/write.rs @@ -0,0 +1,138 @@ +//! Writing blocks to the finalized and non-finalized states. + +use std::sync::{Arc, Mutex}; + +use zebra_chain::block::{self, Height}; + +use crate::service::{ + finalized_state::FinalizedState, + queued_blocks::{QueuedFinalized, QueuedNonFinalized}, + ChainTipBlock, ChainTipSender, +}; + +/// Reads blocks from the channels, writes them to the `finalized_state`, +/// and updates the `chain_tip_sender`. +/// +/// TODO: pass the non-finalized state and associated update channel to this function +#[instrument(skip( + finalized_block_write_receiver, + non_finalized_block_write_receiver, + invalid_block_reset_sender, + chain_tip_sender +))] +pub fn write_blocks_from_channels( + mut finalized_block_write_receiver: tokio::sync::mpsc::UnboundedReceiver, + mut non_finalized_block_write_receiver: tokio::sync::mpsc::UnboundedReceiver< + QueuedNonFinalized, + >, + mut finalized_state: FinalizedState, + invalid_block_reset_sender: tokio::sync::mpsc::UnboundedSender, + chain_tip_sender: Arc>, +) { + // Write all the finalized blocks sent by the state, + // until the state closes the finalized block channel's sender. + while let Some(ordered_block) = finalized_block_write_receiver.blocking_recv() { + // TODO: split these checks into separate functions + + if invalid_block_reset_sender.is_closed() { + info!("StateService closed the block reset channel. Is Zebra shutting down?"); + return; + } + + // Discard any children of invalid blocks in the channel + // + // `commit_finalized()` requires blocks in height order. + // So if there has been a block commit error, + // we need to drop all the descendants of that block, + // until we receive a block at the required next height. + let next_valid_height = finalized_state + .db + .finalized_tip_height() + .map(|height| (height + 1).expect("committed heights are valid")) + .unwrap_or(Height(0)); + + if ordered_block.0.height != next_valid_height { + debug!( + ?next_valid_height, + invalid_height = ?ordered_block.0.height, + invalid_hash = ?ordered_block.0.hash, + "got a block that was the wrong height. \ + Assuming a parent block failed, and dropping this block", + ); + + // We don't want to send a reset here, because it could overwrite a valid sent hash + std::mem::drop(ordered_block); + continue; + } + + // Try committing the block + match finalized_state.commit_finalized(ordered_block) { + Ok(finalized) => { + let tip_block = ChainTipBlock::from(finalized); + + // TODO: update the chain tip sender with non-finalized blocks in this function, + // and get rid of the mutex + chain_tip_sender + .lock() + .expect("unexpected panic in block commit task or state") + .set_finalized_tip(tip_block); + } + Err(error) => { + let finalized_tip = finalized_state.db.tip(); + + // The last block in the queue failed, so we can't commit the next block. + // Instead, we need to reset the state queue, + // and discard any children of the invalid block in the channel. + info!( + ?error, + last_valid_height = ?finalized_tip.map(|tip| tip.0), + last_valid_hash = ?finalized_tip.map(|tip| tip.1), + "committing a block to the finalized state failed, resetting state queue", + ); + + let send_result = + invalid_block_reset_sender.send(finalized_state.db.finalized_tip_hash()); + + if send_result.is_err() { + info!("StateService closed the block reset channel. Is Zebra shutting down?"); + return; + } + } + } + } + + // Do this check even if the channel got closed before any finalized blocks were sent. + // This can happen if we're past the finalized tip. + if invalid_block_reset_sender.is_closed() { + info!("StateService closed the block reset channel. Is Zebra shutting down?"); + return; + } + + // Write all the finalized blocks sent by the state, until Zebra shuts down. + while let Some(_block) = non_finalized_block_write_receiver.blocking_recv() { + if invalid_block_reset_sender.is_closed() { + info!("StateService closed the block reset channel. Is Zebra shutting down?"); + return; + } + + // TODO: + // - read from the channel + // - commit blocks to the non-finalized state + // - if there are any ready, commit blocks to the finalized state + // - handle errors by sending a reset with all the block hashes in the non-finalized state, and the finalized tip + // - update the chain tip sender and cached non-finalized state + error!("handle non-finalized block writes here"); + } + + // We're finished receiving non-finalized blocks from the state. + // + // TODO: + // - make the task an object, and do this in the drop impl? + // - does the drop order matter here? + non_finalized_block_write_receiver.close(); + std::mem::drop(non_finalized_block_write_receiver); + + // We're done writing to the finalized state, so we can force it to shut down. + finalized_state.db.shutdown(true); + std::mem::drop(finalized_state); +} diff --git a/zebrad/src/components/inbound/tests/fake_peer_set.rs b/zebrad/src/components/inbound/tests/fake_peer_set.rs index 9f8570165b2..8962ecad4e4 100644 --- a/zebrad/src/components/inbound/tests/fake_peer_set.rs +++ b/zebrad/src/components/inbound/tests/fake_peer_set.rs @@ -10,13 +10,14 @@ use std::{ }; use futures::FutureExt; -use tokio::{sync::oneshot, task::JoinHandle}; +use tokio::{sync::oneshot, task::JoinHandle, time::timeout}; use tower::{buffer::Buffer, builder::ServiceBuilder, util::BoxService, Service, ServiceExt}; use tracing::Span; use zebra_chain::{ amount::Amount, block::Block, + fmt::humantime_seconds, parameters::Network::{self, *}, serialization::ZcashDeserializeInto, transaction::{UnminedTx, UnminedTxId, VerifiedUnminedTx}, @@ -24,7 +25,7 @@ use zebra_chain::{ use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig}; use zebra_network::{AddressBook, InventoryResponse, Request, Response}; use zebra_node_services::mempool; -use zebra_state::Config as StateConfig; +use zebra_state::{ChainTipChange, Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT}; use zebra_test::mock_service::{MockService, PanicAssertion}; use crate::{ @@ -59,6 +60,7 @@ async fn mempool_requests_for_transactions() { _mock_tx_verifier, mut peer_set, _state_guard, + _chain_tip_change, sync_gossip_task_handle, tx_gossip_task_handle, ) = setup(true).await; @@ -142,6 +144,7 @@ async fn mempool_push_transaction() -> Result<(), crate::BoxError> { mut tx_verifier, mut peer_set, _state_guard, + _chain_tip_change, sync_gossip_task_handle, tx_gossip_task_handle, ) = setup(false).await; @@ -236,6 +239,7 @@ async fn mempool_advertise_transaction_ids() -> Result<(), crate::BoxError> { mut tx_verifier, mut peer_set, _state_guard, + _chain_tip_change, sync_gossip_task_handle, tx_gossip_task_handle, ) = setup(false).await; @@ -342,6 +346,7 @@ async fn mempool_transaction_expiration() -> Result<(), crate::BoxError> { mut tx_verifier, mut peer_set, state_service, + _chain_tip_change, sync_gossip_task_handle, tx_gossip_task_handle, ) = setup(false).await; @@ -638,6 +643,7 @@ async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> { mut tx_verifier, mut peer_set, state_service, + mut chain_tip_change, sync_gossip_task_handle, tx_gossip_task_handle, ) = setup(false).await; @@ -658,7 +664,20 @@ async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> { .await .respond(Response::Blocks(vec![Available(block)])); - // TODO: check that the block is queued in the checkpoint verifier + // Wait for the chain tip update + if let Err(timeout_error) = timeout( + CHAIN_TIP_UPDATE_WAIT_LIMIT, + chain_tip_change.wait_for_tip_change(), + ) + .await + .map(|change_result| change_result.expect("unexpected chain tip update failure")) + { + info!( + timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT), + ?timeout_error, + "timeout waiting for chain tip change after committing block" + ); + } // check that nothing unexpected happened peer_set.expect_no_requests().await; @@ -729,6 +748,7 @@ async fn setup( MockService, MockService, Buffer, zebra_state::Request>, + ChainTipChange, JoinHandle>, JoinHandle>, ) { @@ -744,7 +764,7 @@ async fn setup( ); let address_book = Arc::new(std::sync::Mutex::new(address_book)); let (sync_status, mut recent_syncs) = SyncStatus::new(); - let (state, _read_only_state_service, latest_chain_tip, chain_tip_change) = + let (state, _read_only_state_service, latest_chain_tip, mut chain_tip_change) = zebra_state::init(state_config.clone(), network); let mut state_service = ServiceBuilder::new().buffer(1).service(state); @@ -786,6 +806,21 @@ async fn setup( .unwrap(); committed_blocks.push(genesis_block); + // Wait for the chain tip update + if let Err(timeout_error) = timeout( + CHAIN_TIP_UPDATE_WAIT_LIMIT, + chain_tip_change.wait_for_tip_change(), + ) + .await + .map(|change_result| change_result.expect("unexpected chain tip update failure")) + { + info!( + timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT), + ?timeout_error, + "timeout waiting for chain tip change after committing block" + ); + } + // Also push block 1. // Block one is a network upgrade and the mempool will be cleared at it, // let all our tests start after this event. @@ -801,6 +836,8 @@ async fn setup( .unwrap(); committed_blocks.push(block_one); + // Don't wait for the chain tip update here, we wait for AdvertiseBlock below + let (mut mempool_service, transaction_receiver) = Mempool::new( &MempoolConfig::default(), buffered_peer_set.clone(), @@ -845,7 +882,7 @@ async fn setup( let sync_gossip_task_handle = tokio::spawn(sync::gossip_best_tip_block_hashes( sync_status.clone(), - chain_tip_change, + chain_tip_change.clone(), peer_set.clone(), )); @@ -873,6 +910,7 @@ async fn setup( mock_tx_verifier, peer_set, state_service, + chain_tip_change, sync_gossip_task_handle, tx_gossip_task_handle, ) diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index 06ca8eaa759..1ecb041937a 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -3,20 +3,23 @@ use std::{collections::HashSet, sync::Arc}; use color_eyre::Report; -use tokio::time; +use tokio::time::{self, timeout}; use tower::{ServiceBuilder, ServiceExt}; -use zebra_chain::{block::Block, parameters::Network, serialization::ZcashDeserializeInto}; +use zebra_chain::{ + block::Block, fmt::humantime_seconds, parameters::Network, serialization::ZcashDeserializeInto, +}; use zebra_consensus::transaction as tx; -use zebra_state::Config as StateConfig; +use zebra_state::{Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT}; use zebra_test::mock_service::{MockService, PanicAssertion}; -use super::UnboxMempoolError; use crate::components::{ mempool::{self, storage::tests::unmined_transactions_in_blocks, *}, sync::RecentSyncLengths, }; +use super::UnboxMempoolError; + /// A [`MockService`] representing the network service. type MockPeerSet = MockService; @@ -51,7 +54,7 @@ async fn mempool_service_basic_single() -> Result<(), Report> { // inserted except one (the genesis block transaction). let cost_limit = more_transactions.iter().map(|tx| tx.cost()).sum(); - let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) = + let (mut service, _peer_set, _state_service, _chain_tip_change, _tx_verifier, mut recent_syncs) = setup(network, cost_limit).await; // Enable the mempool @@ -198,7 +201,7 @@ async fn mempool_queue_single() -> Result<(), Report> { .map(|tx| tx.cost()) .sum(); - let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) = + let (mut service, _peer_set, _state_service, _chain_tip_change, _tx_verifier, mut recent_syncs) = setup(network, cost_limit).await; // Enable the mempool @@ -272,7 +275,7 @@ async fn mempool_service_disabled() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; - let (mut service, _peer_set, _state_service, _tx_verifier, mut recent_syncs) = + let (mut service, _peer_set, _state_service, _chain_tip_change, _tx_verifier, mut recent_syncs) = setup(network, u64::MAX).await; // get the genesis block transactions from the Zcash blockchain. @@ -387,8 +390,14 @@ async fn mempool_cancel_mined() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; - let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) = - setup(network, u64::MAX).await; + let ( + mut mempool, + _peer_set, + mut state_service, + _chain_tip_change, + _tx_verifier, + mut recent_syncs, + ) = setup(network, u64::MAX).await; // Enable the mempool mempool.enable(&mut recent_syncs).await; @@ -480,8 +489,14 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> // Using the mainnet for now let network = Network::Mainnet; - let (mut mempool, _peer_set, mut state_service, _tx_verifier, mut recent_syncs) = - setup(network, u64::MAX).await; + let ( + mut mempool, + _peer_set, + mut state_service, + mut chain_tip_change, + _tx_verifier, + mut recent_syncs, + ) = setup(network, u64::MAX).await; // Enable the mempool mempool.enable(&mut recent_syncs).await; @@ -501,6 +516,21 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> .await .unwrap(); + // Wait for the chain tip update + if let Err(timeout_error) = timeout( + CHAIN_TIP_UPDATE_WAIT_LIMIT, + chain_tip_change.wait_for_tip_change(), + ) + .await + .map(|change_result| change_result.expect("unexpected chain tip update failure")) + { + info!( + timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT), + ?timeout_error, + "timeout waiting for chain tip change after committing block" + ); + } + // Queue transaction from block 2 for download let txid = block2.transactions[0].unmined_id(); let response = mempool @@ -533,6 +563,21 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> .await .unwrap(); + // Wait for the chain tip update + if let Err(timeout_error) = timeout( + CHAIN_TIP_UPDATE_WAIT_LIMIT, + chain_tip_change.wait_for_tip_change(), + ) + .await + .map(|change_result| change_result.expect("unexpected chain tip update failure")) + { + info!( + timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT), + ?timeout_error, + "timeout waiting for chain tip change after committing block" + ); + } + // Query the mempool to make it poll chain_tip_change mempool.dummy_call().await; @@ -548,8 +593,14 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; - let (mut mempool, _peer_set, _state_service, mut tx_verifier, mut recent_syncs) = - setup(network, u64::MAX).await; + let ( + mut mempool, + _peer_set, + _state_service, + _chain_tip_change, + mut tx_verifier, + mut recent_syncs, + ) = setup(network, u64::MAX).await; // Get transactions to use in the test let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network); @@ -617,8 +668,14 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { // Using the mainnet for now let network = Network::Mainnet; - let (mut mempool, mut peer_set, _state_service, _tx_verifier, mut recent_syncs) = - setup(network, u64::MAX).await; + let ( + mut mempool, + mut peer_set, + _state_service, + _chain_tip_change, + _tx_verifier, + mut recent_syncs, + ) = setup(network, u64::MAX).await; // Get transactions to use in the test let mut unmined_transactions = unmined_transactions_in_blocks(1..=2, network); @@ -688,6 +745,7 @@ async fn setup( Mempool, MockPeerSet, StateService, + ChainTipChange, MockTxVerifier, RecentSyncLengths, ) { @@ -712,8 +770,15 @@ async fn setup( Buffer::new(BoxService::new(tx_verifier.clone()), 1), sync_status, latest_chain_tip, - chain_tip_change, + chain_tip_change.clone(), ); - (mempool, peer_set, state_service, tx_verifier, recent_syncs) + ( + mempool, + peer_set, + state_service, + chain_tip_change, + tx_verifier, + recent_syncs, + ) } diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index 35f20db4c77..655cfd9b5bf 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -176,8 +176,13 @@ pub async fn run() -> Result<()> { assert_eq!(response, expected_response); } - tracing::info!("waiting for mempool to verify some transactions..."); - zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; + // The timing of verification logs are unrealiable, so we've disabled this check for now. + // + // TODO: when lightwalletd starts returning transactions again: + // re-enable this check, find a better way to check, or delete this commented-out check + // + //tracing::info!("waiting for mempool to verify some transactions..."); + //zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); let mut transactions_stream = rpc_client