diff --git a/zebra-consensus/src/checkpoint.rs b/zebra-consensus/src/checkpoint.rs index b575d79d8b7..2334383b76a 100644 --- a/zebra-consensus/src/checkpoint.rs +++ b/zebra-consensus/src/checkpoint.rs @@ -970,7 +970,7 @@ pub enum VerifyCheckpointError { #[error("checkpoint verifier was dropped")] Dropped, #[error(transparent)] - CommitFinalized(BoxError), + CommitCheckpointVerified(BoxError), #[error(transparent)] Tip(BoxError), #[error(transparent)] @@ -1084,19 +1084,19 @@ where // we don't reject the entire checkpoint. // Instead, we reset the verifier to the successfully committed state tip. let state_service = self.state_service.clone(); - let commit_finalized_block = tokio::spawn(async move { + let commit_checkpoint_verified = tokio::spawn(async move { let hash = req_block .rx .await .map_err(Into::into) - .map_err(VerifyCheckpointError::CommitFinalized) + .map_err(VerifyCheckpointError::CommitCheckpointVerified) .expect("CheckpointVerifier does not leave dangling receivers")?; // We use a `ServiceExt::oneshot`, so that every state service // `poll_ready` has a corresponding `call`. See #1593. match state_service .oneshot(zs::Request::CommitCheckpointVerifiedBlock(req_block.block)) - .map_err(VerifyCheckpointError::CommitFinalized) + .map_err(VerifyCheckpointError::CommitCheckpointVerified) .await? { zs::Response::Committed(committed_hash) => { @@ -1110,10 +1110,10 @@ where let state_service = self.state_service.clone(); let reset_sender = self.reset_sender.clone(); async move { - let result = commit_finalized_block.await; + let result = commit_checkpoint_verified.await; // Avoid a panic on shutdown // - // When `zebrad` is terminated using Ctrl-C, the `commit_finalized_block` task + // When `zebrad` is terminated using Ctrl-C, the `commit_checkpoint_verified` task // can return a `JoinError::Cancelled`. We expect task cancellation on shutdown, // so we don't need to panic here. The persistent state is correct even when the // task is cancelled, because block data is committed inside transactions, in @@ -1121,7 +1121,7 @@ where let result = if zebra_chain::shutdown::is_shutting_down() { Err(VerifyCheckpointError::ShuttingDown) } else { - result.expect("commit_finalized_block should not panic") + result.expect("commit_checkpoint_verified should not panic") }; if result.is_err() { // If there was an error committing the block, then this verifier diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 2f229da9908..75d1f0ac1ca 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -86,7 +86,7 @@ mod tests; pub use finalized_state::{OutputIndex, OutputLocation, TransactionLocation}; -use self::queued_blocks::{QueuedFinalized, QueuedNonFinalized, SentHashes}; +use self::queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified, SentHashes}; /// A read-write service for Zebra's cached blockchain state. /// @@ -124,25 +124,26 @@ pub(crate) struct StateService { // /// Queued blocks for the [`NonFinalizedState`] that arrived out of order. /// These blocks are awaiting their parent blocks before they can do contextual verification. - queued_non_finalized_blocks: QueuedBlocks, + non_finalized_state_queued_blocks: QueuedBlocks, /// Queued blocks for the [`FinalizedState`] that arrived out of order. /// These blocks are awaiting their parent blocks before they can do contextual verification. /// /// Indexed by their parent block hash. - queued_finalized_blocks: HashMap, + finalized_state_queued_blocks: HashMap, /// A channel to send blocks to the `block_write_task`, /// so they can be written to the [`NonFinalizedState`]. non_finalized_block_write_sender: - Option>, + Option>, /// A channel to send blocks to the `block_write_task`, /// so they can be written to the [`FinalizedState`]. /// /// This sender is dropped after the state has finished sending all the checkpointed blocks, /// and the lowest non-finalized block arrives. - finalized_block_write_sender: Option>, + finalized_block_write_sender: + Option>, /// The [`block::Hash`] of the most recent block sent on /// `finalized_block_write_sender` or `non_finalized_block_write_sender`. @@ -151,25 +152,25 @@ pub(crate) struct StateService { /// - the finalized tip, if there are stored blocks, or /// - the genesis block's parent hash, if the database is empty. /// - /// If `invalid_block_reset_receiver` gets a reset, this is: + /// If `invalid_block_write_reset_receiver` gets a reset, this is: /// - the hash of the last valid committed block (the parent of the invalid block). // // TODO: // - turn this into an IndexMap containing recent non-finalized block hashes and heights // (they are all potential tips) // - remove block hashes once their heights are strictly less than the finalized tip - last_sent_finalized_block_hash: block::Hash, + finalized_block_write_last_sent_hash: block::Hash, /// A set of block hashes that have been sent to the block write task. /// Hashes of blocks below the finalized tip height are periodically pruned. - sent_non_finalized_block_hashes: SentHashes, + non_finalized_block_write_sent_hashes: SentHashes, /// If an invalid block is sent on `finalized_block_write_sender` /// or `non_finalized_block_write_sender`, /// this channel gets the [`block::Hash`] of the valid tip. // // TODO: add tests for finalized and non-finalized resets (#2654) - invalid_block_reset_receiver: tokio::sync::mpsc::UnboundedReceiver, + invalid_block_write_reset_receiver: tokio::sync::mpsc::UnboundedReceiver, // Pending UTXO Request Tracking // @@ -188,11 +189,11 @@ pub(crate) struct StateService { // Metrics // - /// A metric tracking the maximum height that's currently in `queued_finalized_blocks` + /// A metric tracking the maximum height that's currently in `finalized_state_queued_blocks` /// - /// Set to `f64::NAN` if `queued_finalized_blocks` is empty, because grafana shows NaNs + /// Set to `f64::NAN` if `finalized_state_queued_blocks` is empty, because grafana shows NaNs /// as a break in the graph. - max_queued_finalized_height: f64, + max_finalized_queue_height: f64, } /// A read-only service for accessing Zebra's cached blockchain state. @@ -245,16 +246,16 @@ impl Drop for StateService { // Close the channels (non-blocking) // This makes the block write thread exit the next time it checks the channels. // We want to do this here so we get any errors or panics from the block write task before it shuts down. - self.invalid_block_reset_receiver.close(); + self.invalid_block_write_reset_receiver.close(); std::mem::drop(self.finalized_block_write_sender.take()); std::mem::drop(self.non_finalized_block_write_sender.take()); self.clear_finalized_block_queue( - "dropping the state: dropped unused queued finalized block", + "dropping the state: dropped unused finalized state queue block", ); self.clear_non_finalized_block_queue( - "dropping the state: dropped unused queued non-finalized block", + "dropping the state: dropped unused non-finalized state queue block", ); // Then drop self.read_service, which checks the block write task for panics, @@ -364,7 +365,7 @@ impl StateService { tokio::sync::mpsc::unbounded_channel(); let (finalized_block_write_sender, finalized_block_write_receiver) = tokio::sync::mpsc::unbounded_channel(); - let (invalid_block_reset_sender, invalid_block_reset_receiver) = + let (invalid_block_reset_sender, invalid_block_write_reset_receiver) = tokio::sync::mpsc::unbounded_channel(); let finalized_state_for_writing = finalized_state.clone(); @@ -396,25 +397,25 @@ impl StateService { let full_verifier_utxo_lookahead = full_verifier_utxo_lookahead.expect("unexpected negative height"); - let queued_non_finalized_blocks = QueuedBlocks::default(); + let non_finalized_state_queued_blocks = QueuedBlocks::default(); let pending_utxos = PendingUtxos::default(); - let last_sent_finalized_block_hash = finalized_state.db.finalized_tip_hash(); + let finalized_block_write_last_sent_hash = finalized_state.db.finalized_tip_hash(); let state = Self { network, full_verifier_utxo_lookahead, - queued_non_finalized_blocks, - queued_finalized_blocks: HashMap::new(), + non_finalized_state_queued_blocks, + finalized_state_queued_blocks: HashMap::new(), non_finalized_block_write_sender: Some(non_finalized_block_write_sender), finalized_block_write_sender: Some(finalized_block_write_sender), - last_sent_finalized_block_hash, - sent_non_finalized_block_hashes: SentHashes::default(), - invalid_block_reset_receiver, + finalized_block_write_last_sent_hash, + non_finalized_block_write_sent_hashes: SentHashes::default(), + invalid_block_write_reset_receiver, pending_utxos, last_prune: Instant::now(), read_service: read_service.clone(), - max_queued_finalized_height: f64::NAN, + max_finalized_queue_height: f64::NAN, }; timer.finish(module_path!(), line!(), "initializing state service"); @@ -457,7 +458,7 @@ impl StateService { /// Queue a finalized block for verification and storage in the finalized state. /// /// Returns a channel receiver that provides the result of the block commit. - fn queue_and_commit_finalized( + fn queue_and_commit_to_finalized_state( &mut self, checkpoint_verified: CheckpointVerifiedBlock, ) -> oneshot::Receiver> { @@ -472,7 +473,7 @@ impl StateService { // If we're close to the final checkpoint, make the block's UTXOs available for // full verification of non-finalized blocks, even when it is in the channel. if self.is_close_to_final_checkpoint(queued_height) { - self.sent_non_finalized_block_hashes + self.non_finalized_block_write_sent_hashes .add_finalized(&checkpoint_verified) } @@ -482,23 +483,23 @@ impl StateService { if self.finalized_block_write_sender.is_some() { // We're still committing finalized blocks if let Some(duplicate_queued) = self - .queued_finalized_blocks + .finalized_state_queued_blocks .insert(queued_prev_hash, queued) { - Self::send_finalized_block_error( + Self::send_checkpoint_verified_block_error( duplicate_queued, "dropping older finalized block: got newer duplicate block", ); } - self.drain_queue_and_commit_finalized(); + self.drain_finalized_queue_and_commit(); } else { // We've finished committing finalized blocks, so drop any repeated queued blocks, // and return an error. // // TODO: track the latest sent height, and drop any blocks under that height - // every time we send some blocks (like QueuedNonFinalizedBlocks) - Self::send_finalized_block_error( + // every time we send some blocks (like QueuedSemanticallyVerifiedBlocks) + Self::send_checkpoint_verified_block_error( queued, "already finished committing finalized blocks: dropped duplicate block, \ block is already committed to the state", @@ -510,39 +511,39 @@ impl StateService { ); } - if self.queued_finalized_blocks.is_empty() { - self.max_queued_finalized_height = f64::NAN; - } else if self.max_queued_finalized_height.is_nan() - || self.max_queued_finalized_height < queued_height.0 as f64 + if self.finalized_state_queued_blocks.is_empty() { + self.max_finalized_queue_height = f64::NAN; + } else if self.max_finalized_queue_height.is_nan() + || self.max_finalized_queue_height < queued_height.0 as f64 { // if there are still blocks in the queue, then either: // - the new block was lower than the old maximum, and there was a gap before it, // so the maximum is still the same (and we skip this code), or // - the new block is higher than the old maximum, and there is at least one gap // between the finalized tip and the new maximum - self.max_queued_finalized_height = queued_height.0 as f64; + self.max_finalized_queue_height = queued_height.0 as f64; } metrics::gauge!( "state.checkpoint.queued.max.height", - self.max_queued_finalized_height, + self.max_finalized_queue_height, ); metrics::gauge!( "state.checkpoint.queued.block.count", - self.queued_finalized_blocks.len() as f64, + self.finalized_state_queued_blocks.len() as f64, ); rsp_rx } - /// Finds queued finalized blocks to be committed to the state in order, + /// Finds finalized state queue blocks to be committed to the state in order, /// removes them from the queue, and sends them to the block commit task. /// /// After queueing a finalized block, this method checks whether the newly /// queued block (and any of its descendants) can be committed to the state. /// /// Returns an error if the block commit channel has been closed. - pub fn drain_queue_and_commit_finalized(&mut self) { + pub fn drain_finalized_queue_and_commit(&mut self) { use tokio::sync::mpsc::error::{SendError, TryRecvError}; // # Correctness & Performance @@ -551,8 +552,8 @@ impl StateService { // because it is called directly from the tokio executor's Future threads. // If a block failed, we need to start again from a valid tip. - match self.invalid_block_reset_receiver.try_recv() { - Ok(reset_tip_hash) => self.last_sent_finalized_block_hash = reset_tip_hash, + match self.invalid_block_write_reset_receiver.try_recv() { + Ok(reset_tip_hash) => self.finalized_block_write_last_sent_hash = reset_tip_hash, Err(TryRecvError::Disconnected) => { info!("Block commit task closed the block reset channel. Is Zebra shutting down?"); return; @@ -562,12 +563,12 @@ impl StateService { } while let Some(queued_block) = self - .queued_finalized_blocks - .remove(&self.last_sent_finalized_block_hash) + .finalized_state_queued_blocks + .remove(&self.finalized_block_write_last_sent_hash) { let last_sent_finalized_block_height = queued_block.0.height; - self.last_sent_finalized_block_hash = queued_block.0.hash; + self.finalized_block_write_last_sent_hash = queued_block.0.hash; // If we've finished sending finalized blocks, ignore any repeated blocks. // (Blocks can be repeated after a syncer reset.) @@ -577,7 +578,7 @@ impl StateService { // If the receiver is closed, we can't send any more blocks. if let Err(SendError(queued)) = send_result { // If Zebra is shutting down, drop blocks and return an error. - Self::send_finalized_block_error( + Self::send_checkpoint_verified_block_error( queued, "block commit task exited. Is Zebra shutting down?", ); @@ -595,15 +596,18 @@ impl StateService { } } - /// Drops all queued finalized blocks, and sends an error on their result channels. + /// Drops all finalized state queue blocks, and sends an error on their result channels. fn clear_finalized_block_queue(&mut self, error: impl Into + Clone) { - for (_hash, queued) in self.queued_finalized_blocks.drain() { - Self::send_finalized_block_error(queued, error.clone()); + for (_hash, queued) in self.finalized_state_queued_blocks.drain() { + Self::send_checkpoint_verified_block_error(queued, error.clone()); } } - /// Send an error on a `QueuedFinalized` block's result channel, and drop the block - fn send_finalized_block_error(queued: QueuedFinalized, error: impl Into) { + /// Send an error on a `QueuedCheckpointVerified` block's result channel, and drop the block + fn send_checkpoint_verified_block_error( + queued: QueuedCheckpointVerified, + error: impl Into, + ) { let (finalized, rsp_tx) = queued; // The block sender might have already given up on this block, @@ -612,15 +616,18 @@ impl StateService { std::mem::drop(finalized); } - /// Drops all queued non-finalized blocks, and sends an error on their result channels. + /// Drops all non-finalized state queue blocks, and sends an error on their result channels. fn clear_non_finalized_block_queue(&mut self, error: impl Into + Clone) { - for (_hash, queued) in self.queued_non_finalized_blocks.drain() { - Self::send_non_finalized_block_error(queued, error.clone()); + for (_hash, queued) in self.non_finalized_state_queued_blocks.drain() { + Self::send_semantically_verified_block_error(queued, error.clone()); } } - /// Send an error on a `QueuedNonFinalized` block's result channel, and drop the block - fn send_non_finalized_block_error(queued: QueuedNonFinalized, error: impl Into) { + /// Send an error on a `QueuedSemanticallyVerified` block's result channel, and drop the block + fn send_semantically_verified_block_error( + queued: QueuedSemanticallyVerified, + error: impl Into, + ) { let (finalized, rsp_tx) = queued; // The block sender might have already given up on this block, @@ -637,7 +644,7 @@ impl StateService { /// /// [1]: https://zebra.zfnd.org/dev/rfcs/0005-state-updates.html#committing-non-finalized-blocks #[instrument(level = "debug", skip(self, semantically_verrified))] - fn queue_and_commit_non_finalized( + fn queue_and_commit_to_non_finalized_state( &mut self, semantically_verrified: SemanticallyVerifiedBlock, ) -> oneshot::Receiver> { @@ -645,7 +652,7 @@ impl StateService { let parent_hash = semantically_verrified.block.header.previous_block_hash; if self - .sent_non_finalized_block_hashes + .non_finalized_block_write_sent_hashes .contains(&semantically_verrified.hash) { let (rsp_tx, rsp_rx) = oneshot::channel(); @@ -672,7 +679,7 @@ impl StateService { // has been queued but not yet committed to the state fails the older request and replaces // it with the newer request. let rsp_rx = if let Some((_, old_rsp_tx)) = self - .queued_non_finalized_blocks + .non_finalized_state_queued_blocks .get_mut(&semantically_verrified.hash) { tracing::debug!("replacing older queued request with new request"); @@ -682,7 +689,7 @@ impl StateService { rsp_rx } else { let (rsp_tx, rsp_rx) = oneshot::channel(); - self.queued_non_finalized_blocks + self.non_finalized_state_queued_blocks .queue((semantically_verrified, rsp_tx)); rsp_rx }; @@ -697,9 +704,10 @@ impl StateService { // TODO: configure the state with the last checkpoint hash instead? if self.finalized_block_write_sender.is_some() && self - .queued_non_finalized_blocks - .has_queued_children(self.last_sent_finalized_block_hash) - && self.read_service.db.finalized_tip_hash() == self.last_sent_finalized_block_hash + .non_finalized_state_queued_blocks + .has_queued_children(self.finalized_block_write_last_sent_hash) + && self.read_service.db.finalized_tip_hash() + == self.finalized_block_write_last_sent_hash { // Tell the block write task to stop committing finalized blocks, // and move on to committing non-finalized blocks. @@ -728,10 +736,10 @@ impl StateService { "Finalized state must have at least one block before committing non-finalized state", ); - self.queued_non_finalized_blocks + self.non_finalized_state_queued_blocks .prune_by_height(finalized_tip_height); - self.sent_non_finalized_block_hashes + self.non_finalized_block_write_sent_hashes .prune_by_height(finalized_tip_height); } @@ -740,7 +748,7 @@ impl StateService { /// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks. fn can_fork_chain_at(&self, hash: &block::Hash) -> bool { - self.sent_non_finalized_block_hashes.contains(hash) + self.non_finalized_block_write_sent_hashes.contains(hash) || &self.read_service.db.finalized_tip_hash() == hash } @@ -765,18 +773,19 @@ impl StateService { while let Some(parent_hash) = new_parents.pop() { let queued_children = self - .queued_non_finalized_blocks + .non_finalized_state_queued_blocks .dequeue_children(parent_hash); for queued_child in queued_children { let (SemanticallyVerifiedBlock { hash, .. }, _) = queued_child; - self.sent_non_finalized_block_hashes.add(&queued_child.0); + self.non_finalized_block_write_sent_hashes + .add(&queued_child.0); let send_result = non_finalized_block_write_sender.send(queued_child); if let Err(SendError(queued)) = send_result { // If Zebra is shutting down, drop blocks and return an error. - Self::send_non_finalized_block_error( + Self::send_semantically_verified_block_error( queued, "block commit task exited. Is Zebra shutting down?", ); @@ -792,7 +801,7 @@ impl StateService { } } - self.sent_non_finalized_block_hashes.finish_batch(); + self.non_finalized_block_write_sent_hashes.finish_batch(); }; } @@ -905,7 +914,7 @@ impl Service for StateService { let span = Span::current(); match req { - // Uses queued_non_finalized_blocks and pending_utxos in the StateService + // Uses non_finalized_state_queued_blocks and pending_utxos in the StateService // Accesses shared writeable state in the StateService, NonFinalizedState, and ZebraDb. Request::CommitSemanticallyVerifiedBlock(semantically_verified) => { self.assert_block_can_be_validated(&semantically_verified); @@ -925,7 +934,9 @@ impl Service for StateService { // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html let rsp_rx = tokio::task::block_in_place(move || { - span.in_scope(|| self.queue_and_commit_non_finalized(semantically_verified)) + span.in_scope(|| { + self.queue_and_commit_to_non_finalized_state(semantically_verified) + }) }); // TODO: @@ -954,7 +965,7 @@ impl Service for StateService { .boxed() } - // Uses queued_finalized_blocks and pending_utxos in the StateService. + // Uses finalized_state_queued_blocks and pending_utxos in the StateService. // Accesses shared writeable state in the StateService. Request::CommitCheckpointVerifiedBlock(finalized) => { // # Consensus @@ -971,7 +982,7 @@ impl Service for StateService { // // This method doesn't block, access the database, or perform CPU-intensive tasks, // so we can run it directly in the tokio executor's Future threads. - let rsp_rx = self.queue_and_commit_finalized(finalized); + let rsp_rx = self.queue_and_commit_to_finalized_state(finalized); // TODO: // - check for panics in the block write task here, @@ -996,7 +1007,7 @@ impl Service for StateService { .boxed() } - // Uses pending_utxos and queued_non_finalized_blocks in the StateService. + // Uses pending_utxos and non_finalized_state_queued_blocks in the StateService. // If the UTXO isn't in the queued blocks, runs concurrently using the ReadStateService. Request::AwaitUtxo(outpoint) => { // Prepare the AwaitUtxo future from PendingUxtos. @@ -1008,7 +1019,7 @@ impl Service for StateService { // Check the non-finalized block queue outside the returned future, // so we can access mutable state fields. - if let Some(utxo) = self.queued_non_finalized_blocks.utxo(&outpoint) { + if let Some(utxo) = self.non_finalized_state_queued_blocks.utxo(&outpoint) { self.pending_utxos.respond(&outpoint, utxo); // We're finished, the returned future gets the UTXO from the respond() channel. @@ -1018,7 +1029,7 @@ impl Service for StateService { } // Check the sent non-finalized blocks - if let Some(utxo) = self.sent_non_finalized_block_hashes.utxo(&outpoint) { + if let Some(utxo) = self.non_finalized_block_write_sent_hashes.utxo(&outpoint) { self.pending_utxos.respond(&outpoint, utxo); // We're finished, the returned future gets the UTXO from the respond() channel. @@ -1027,7 +1038,7 @@ impl Service for StateService { return response_fut; } - // We ignore any UTXOs in FinalizedState.queued_finalized_blocks, + // We ignore any UTXOs in FinalizedState.finalized_state_queued_blocks, // because it is only used during checkpoint verification. // // This creates a rare race condition, but it doesn't seem to happen much in practice. diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 1ac34e2c5db..c6ca264f38e 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -24,7 +24,7 @@ use zebra_chain::{block, parameters::Network}; use crate::{ request::ContextuallyVerifiedBlockWithTrees, - service::{check, QueuedFinalized}, + service::{check, QueuedCheckpointVerified}, BoxError, CheckpointVerifiedBlock, CloneError, Config, }; @@ -167,7 +167,7 @@ impl FinalizedState { /// order. pub fn commit_finalized( &mut self, - ordered_block: QueuedFinalized, + ordered_block: QueuedCheckpointVerified, ) -> Result { let (checkpoint_verified, rsp_tx) = ordered_block; let result = self.commit_finalized_direct( diff --git a/zebra-state/src/service/queued_blocks.rs b/zebra-state/src/service/queued_blocks.rs index 9f350ea2c05..7a009605c53 100644 --- a/zebra-state/src/service/queued_blocks.rs +++ b/zebra-state/src/service/queued_blocks.rs @@ -15,14 +15,14 @@ use crate::{BoxError, CheckpointVerifiedBlock, SemanticallyVerifiedBlock}; #[cfg(test)] mod tests; -/// A queued finalized block, and its corresponding [`Result`] channel. -pub type QueuedFinalized = ( +/// A finalized state queue block, and its corresponding [`Result`] channel. +pub type QueuedCheckpointVerified = ( CheckpointVerifiedBlock, oneshot::Sender>, ); -/// A queued non-finalized block, and its corresponding [`Result`] channel. -pub type QueuedNonFinalized = ( +/// A non-finalized state queue block, and its corresponding [`Result`] channel. +pub type QueuedSemanticallyVerified = ( SemanticallyVerifiedBlock, oneshot::Sender>, ); @@ -31,7 +31,7 @@ pub type QueuedNonFinalized = ( #[derive(Debug, Default)] pub struct QueuedBlocks { /// Blocks awaiting their parent blocks for contextual verification. - blocks: HashMap, + blocks: HashMap, /// Hashes from `queued_blocks`, indexed by parent hash. by_parent: HashMap>, /// Hashes from `queued_blocks`, indexed by block height. @@ -47,7 +47,7 @@ impl QueuedBlocks { /// /// - if a block with the same `block::Hash` has already been queued. #[instrument(skip(self), fields(height = ?new.0.height, hash = %new.0.hash))] - pub fn queue(&mut self, new: QueuedNonFinalized) { + pub fn queue(&mut self, new: QueuedSemanticallyVerified) { let new_hash = new.0.hash; let new_height = new.0.height; let parent_hash = new.0.block.header.previous_block_hash; @@ -86,7 +86,10 @@ impl QueuedBlocks { /// Dequeue and return all blocks that were waiting for the arrival of /// `parent`. #[instrument(skip(self), fields(%parent_hash))] - pub fn dequeue_children(&mut self, parent_hash: block::Hash) -> Vec { + pub fn dequeue_children( + &mut self, + parent_hash: block::Hash, + ) -> Vec { let queued_children = self .by_parent .remove(&parent_hash) @@ -176,7 +179,7 @@ impl QueuedBlocks { } /// Return the queued block if it has already been registered - pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedNonFinalized> { + pub fn get_mut(&mut self, hash: &block::Hash) -> Option<&mut QueuedSemanticallyVerified> { self.blocks.get_mut(hash) } @@ -208,7 +211,7 @@ impl QueuedBlocks { /// Returns all key-value pairs of blocks as an iterator. /// /// Doesn't update the metrics, because it is only used when the state is being dropped. - pub fn drain(&mut self) -> Drain<'_, block::Hash, QueuedNonFinalized> { + pub fn drain(&mut self) -> Drain<'_, block::Hash, QueuedSemanticallyVerified> { self.known_utxos.clear(); self.known_utxos.shrink_to_fit(); self.by_parent.clear(); diff --git a/zebra-state/src/service/queued_blocks/tests/vectors.rs b/zebra-state/src/service/queued_blocks/tests/vectors.rs index bd8dcbeb8e2..203caf706e6 100644 --- a/zebra-state/src/service/queued_blocks/tests/vectors.rs +++ b/zebra-state/src/service/queued_blocks/tests/vectors.rs @@ -9,17 +9,17 @@ use zebra_test::prelude::*; use crate::{ arbitrary::Prepare, - service::queued_blocks::{QueuedBlocks, QueuedNonFinalized}, + service::queued_blocks::{QueuedBlocks, QueuedSemanticallyVerified}, tests::FakeChainHelper, }; // Quick helper trait for making queued blocks with throw away channels trait IntoQueued { - fn into_queued(self) -> QueuedNonFinalized; + fn into_queued(self) -> QueuedSemanticallyVerified; } impl IntoQueued for Arc { - fn into_queued(self) -> QueuedNonFinalized { + fn into_queued(self) -> QueuedSemanticallyVerified { let (rsp_tx, _) = oneshot::channel(); (self.prepare(), rsp_tx) } diff --git a/zebra-state/src/service/tests.rs b/zebra-state/src/service/tests.rs index aed292313f4..5adfaabdf39 100644 --- a/zebra-state/src/service/tests.rs +++ b/zebra-state/src/service/tests.rs @@ -424,7 +424,7 @@ proptest! { expected_finalized_value_pool += *block_value_pool; } - let result_receiver = state_service.queue_and_commit_finalized(block.clone()); + let result_receiver = state_service.queue_and_commit_to_finalized_state(block.clone()); let result = result_receiver.blocking_recv(); prop_assert!(result.is_ok(), "unexpected failed finalized block commit: {:?}", result); @@ -450,7 +450,7 @@ proptest! { let block_value_pool = &block.block.chain_value_pool_change(&transparent::utxos_from_ordered_utxos(utxos))?; expected_non_finalized_value_pool += *block_value_pool; - let result_receiver = state_service.queue_and_commit_non_finalized(block.clone()); + let result_receiver = state_service.queue_and_commit_to_non_finalized_state(block.clone()); let result = result_receiver.blocking_recv(); prop_assert!(result.is_ok(), "unexpected failed non-finalized block commit: {:?}", result); @@ -509,7 +509,7 @@ proptest! { TipAction::grow_with(expected_block.clone().into()) }; - let result_receiver = state_service.queue_and_commit_finalized(block); + let result_receiver = state_service.queue_and_commit_to_finalized_state(block); let result = result_receiver.blocking_recv(); prop_assert!(result.is_ok(), "unexpected failed finalized block commit: {:?}", result); @@ -532,7 +532,7 @@ proptest! { TipAction::grow_with(expected_block.clone().into()) }; - let result_receiver = state_service.queue_and_commit_non_finalized(block); + let result_receiver = state_service.queue_and_commit_to_non_finalized_state(block); let result = result_receiver.blocking_recv(); prop_assert!(result.is_ok(), "unexpected failed non-finalized block commit: {:?}", result); diff --git a/zebra-state/src/service/write.rs b/zebra-state/src/service/write.rs index 74d6de14400..94392d2aa2c 100644 --- a/zebra-state/src/service/write.rs +++ b/zebra-state/src/service/write.rs @@ -17,7 +17,7 @@ use crate::{ check, finalized_state::{FinalizedState, ZebraDb}, non_finalized_state::NonFinalizedState, - queued_blocks::{QueuedFinalized, QueuedNonFinalized}, + queued_blocks::{QueuedCheckpointVerified, QueuedSemanticallyVerified}, BoxError, ChainTipBlock, ChainTipSender, CloneError, }, CommitSemanticallyVerifiedError, SemanticallyVerifiedBlock, @@ -131,8 +131,8 @@ fn update_latest_chain_channels( ) )] pub fn write_blocks_from_channels( - mut finalized_block_write_receiver: UnboundedReceiver, - mut non_finalized_block_write_receiver: UnboundedReceiver, + mut finalized_block_write_receiver: UnboundedReceiver, + mut non_finalized_block_write_receiver: UnboundedReceiver, mut finalized_state: FinalizedState, mut non_finalized_state: NonFinalizedState, invalid_block_reset_sender: UnboundedSender,