Skip to content

Commit

Permalink
revert changes related to checking queue
Browse files Browse the repository at this point in the history
  • Loading branch information
arya2 committed Mar 23, 2023
1 parent 9118125 commit 2c15632
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 83 deletions.
2 changes: 1 addition & 1 deletion zebra-consensus/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ where
.map_err(|source| VerifyBlockError::Depth { source, hash })?
{
zs::Response::KnownBlock(Some(location)) => {
return Err(BlockError::AlreadyInState(hash, location).into())
return Err(BlockError::AlreadyInChain(hash, location).into())
}
zs::Response::KnownBlock(None) => {}
_ => unreachable!("wrong response to Request::KnownBlock"),
Expand Down
4 changes: 2 additions & 2 deletions zebra-consensus/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ pub enum BlockError {
DuplicateTransaction,

#[error("block {0:?} is already in present in the state {1:?}")]
AlreadyInState(zebra_chain::block::Hash, zebra_state::KnownBlock),
AlreadyInChain(zebra_chain::block::Hash, zebra_state::KnownBlock),

#[error("invalid block {0:?}: missing block height")]
MissingHeight(zebra_chain::block::Hash),
Expand Down Expand Up @@ -311,6 +311,6 @@ impl BlockError {
/// Returns `true` if this is definitely a duplicate request.
/// Some duplicate requests might not be detected, and therefore return `false`.
pub fn is_duplicate_request(&self) -> bool {
matches!(self, BlockError::AlreadyInState(..))
matches!(self, BlockError::AlreadyInChain(..))
}
}
80 changes: 30 additions & 50 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use crate::{
MAX_FIND_BLOCK_HASHES_RESULTS, MAX_FIND_BLOCK_HEADERS_RESULTS_FOR_ZEBRA,
MAX_LEGACY_CHAIN_BLOCKS,
},
response::KnownBlock,
service::{
block_iter::any_ancestor_blocks,
chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip},
Expand Down Expand Up @@ -163,7 +162,7 @@ pub(crate) struct StateService {

/// A set of block hashes that have been sent to the block write task.
/// Hashes of blocks below the finalized tip height are periodically pruned.
sent_blocks: SentHashes,
sent_non_finalized_block_hashes: SentHashes,

/// If an invalid block is sent on `finalized_block_write_sender`
/// or `non_finalized_block_write_sender`,
Expand Down Expand Up @@ -409,7 +408,7 @@ impl StateService {
non_finalized_block_write_sender: Some(non_finalized_block_write_sender),
finalized_block_write_sender: Some(finalized_block_write_sender),
last_sent_finalized_block_hash,
sent_blocks: SentHashes::default(),
sent_non_finalized_block_hashes: SentHashes::default(),
invalid_block_reset_receiver,
pending_utxos,
last_prune: Instant::now(),
Expand Down Expand Up @@ -472,7 +471,8 @@ impl StateService {
// If we're close to the final checkpoint, make the block's UTXOs available for
// full verification of non-finalized blocks, even when it is in the channel.
if self.is_close_to_final_checkpoint(queued_height) {
self.sent_blocks.add_finalized(&finalized)
self.sent_non_finalized_block_hashes
.add_finalized(&finalized)
}

let (rsp_tx, rsp_rx) = oneshot::channel();
Expand Down Expand Up @@ -551,39 +551,22 @@ impl StateService {

// If a block failed, we need to start again from a valid tip.
match self.invalid_block_reset_receiver.try_recv() {
Ok(reset_tip_hash) => {
// any blocks that may have been sent were either committed or dropped
// if a reset signal has been received, so we can clear `sent_blocks` and
// let Zebra re-download/verify blocks that were dropped.
self.sent_blocks.clear();
self.last_sent_finalized_block_hash = reset_tip_hash
}
Ok(reset_tip_hash) => self.last_sent_finalized_block_hash = reset_tip_hash,
Err(TryRecvError::Disconnected) => {
info!("Block commit task closed the block reset channel. Is Zebra shutting down?");
return;
}
// There are no errors, so we can just use the last block hash we sent
Err(TryRecvError::Empty) => {
if let Some(finalized_tip_height) = self.read_service.db.finalized_tip_height() {
self.sent_blocks.prune_by_height(finalized_tip_height);
}
}
Err(TryRecvError::Empty) => {}
}

while let Some(queued_block) = self
.queued_finalized_blocks
.remove(&self.last_sent_finalized_block_hash)
{
let (finalized, _) = &queued_block;
let &FinalizedBlock { hash, height, .. } = finalized;
let last_sent_finalized_block_height = queued_block.0.height;

self.last_sent_finalized_block_hash = hash;

// If we're not close to the final checkpoint, add the hash for checking if
// a block is present in a queue when called with `Request::KnownBlock`.
if !self.is_close_to_final_checkpoint(height) {
self.sent_blocks.add_finalized_hash(hash, height);
}
self.last_sent_finalized_block_hash = queued_block.0.hash;

// If we've finished sending finalized blocks, ignore any repeated blocks.
// (Blocks can be repeated after a syncer reset.)
Expand All @@ -602,7 +585,10 @@ impl StateService {
"block commit task exited. Is Zebra shutting down?",
);
} else {
metrics::gauge!("state.checkpoint.sent.block.height", height.0 as f64,);
metrics::gauge!(
"state.checkpoint.sent.block.height",
last_sent_finalized_block_height.0 as f64,
);
};
}
}
Expand Down Expand Up @@ -657,7 +643,10 @@ impl StateService {
tracing::debug!(block = %prepared.block, "queueing block for contextual verification");
let parent_hash = prepared.block.header.previous_block_hash;

if self.sent_blocks.contains(&prepared.hash) {
if self
.sent_non_finalized_block_hashes
.contains(&prepared.hash)
{
let (rsp_tx, rsp_rx) = oneshot::channel();
let _ = rsp_tx.send(Err(
"block has already been sent to be committed to the state".into(),
Expand Down Expand Up @@ -724,26 +713,28 @@ impl StateService {
return rsp_rx;
}

// Wait until block commit task is ready to write non-finalized blocks before dequeuing them
self.send_ready_non_finalized_queued(parent_hash);
if self.finalized_block_write_sender.is_none() {
// Wait until block commit task is ready to write non-finalized blocks before dequeuing them
self.send_ready_non_finalized_queued(parent_hash);

let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
let finalized_tip_height = self.read_service.db.finalized_tip_height().expect(
"Finalized state must have at least one block before committing non-finalized state",
);

self.queued_non_finalized_blocks
.prune_by_height(finalized_tip_height);
self.queued_non_finalized_blocks
.prune_by_height(finalized_tip_height);

self.sent_blocks.prune_by_height(finalized_tip_height);
self.sent_non_finalized_block_hashes
.prune_by_height(finalized_tip_height);
}

rsp_rx
}

/// Returns `true` if `hash` is a valid previous block hash for new non-finalized blocks.
fn can_fork_chain_at(&self, hash: &block::Hash) -> bool {
self.finalized_block_write_sender.is_none()
&& (self.sent_blocks.contains(hash)
|| &self.read_service.db.finalized_tip_hash() == hash)
self.sent_non_finalized_block_hashes.contains(hash)
|| &self.read_service.db.finalized_tip_hash() == hash
}

/// Returns `true` if `queued_height` is near the final checkpoint.
Expand Down Expand Up @@ -773,7 +764,7 @@ impl StateService {
for queued_child in queued_children {
let (PreparedBlock { hash, .. }, _) = queued_child;

self.sent_blocks.add(&queued_child.0);
self.sent_non_finalized_block_hashes.add(&queued_child.0);
let send_result = non_finalized_block_write_sender.send(queued_child);

if let Err(SendError(queued)) = send_result {
Expand All @@ -794,7 +785,7 @@ impl StateService {
}
}

self.sent_blocks.finish_batch();
self.sent_non_finalized_block_hashes.finish_batch();
};
}

Expand All @@ -816,14 +807,6 @@ impl StateService {
blocks"
);
}

/// Returns true if the block hash is queued or has been sent to be validated and committed.
/// Returns false otherwise.
fn is_block_queued(&self, hash: &block::Hash) -> bool {
self.queued_non_finalized_blocks.contains_block_hash(hash)
|| self.queued_finalized_blocks.contains_key(hash)
|| self.sent_blocks.contains(hash)
}
}

impl ReadStateService {
Expand Down Expand Up @@ -1028,7 +1011,7 @@ impl Service<Request> for StateService {
}

// Check the sent non-finalized blocks
if let Some(utxo) = self.sent_blocks.utxo(&outpoint) {
if let Some(utxo) = self.sent_non_finalized_block_hashes.utxo(&outpoint) {
self.pending_utxos.respond(&outpoint, utxo);

// We're finished, the returned future gets the UTXO from the respond() channel.
Expand Down Expand Up @@ -1085,16 +1068,13 @@ impl Service<Request> for StateService {
Request::KnownBlock(hash) => {
let timer = CodeTimer::start();

let is_block_queued = self.is_block_queued(&hash);

let read_service = self.read_service.clone();

async move {
let response = read::non_finalized_state_contains_block_hash(
&read_service.latest_non_finalized_state(),
hash,
)
.or(is_block_queued.then_some(KnownBlock::Queue))
.or_else(|| read::finalized_state_contains_block_hash(&read_service.db, hash));

// The work is done in the future.
Expand Down
29 changes: 0 additions & 29 deletions zebra-state/src/service/queued_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,6 @@ impl QueuedBlocks {

self.blocks.drain()
}

/// Returns true if QueuedBlocks contains a block with the given hash.
/// Returns false otherwise.
pub fn contains_block_hash(&self, hash: &block::Hash) -> bool {
self.blocks.contains_key(hash)
}
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -295,20 +289,6 @@ impl SentHashes {
self.update_metrics_for_block(block.height);
}

/// Stores the finalized `block`'s hash, so it can be used to check if a
/// block is in the state queue.
///
/// Assumes that blocks are added in the order of their height between `finish_batch` calls
/// for efficient pruning.
///
/// For more details see `add()`.
pub fn add_finalized_hash(&mut self, hash: block::Hash, height: block::Height) {
self.curr_buf.push_back((hash, height));
self.sent.insert(hash, vec![]);

self.update_metrics_for_block(height);
}

/// Try to look up this UTXO in any sent block.
#[instrument(skip(self))]
pub fn utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
Expand Down Expand Up @@ -355,15 +335,6 @@ impl SentHashes {
self.update_metrics_for_cache();
}

/// Clears all data from `SentBlocks`
pub fn clear(&mut self) {
self.sent.clear();
self.bufs.clear();
self.curr_buf.clear();
self.known_utxos.clear();
self.update_metrics_for_cache();
}

/// Returns true if SentHashes contains the `hash`
pub fn contains(&self, hash: &block::Hash) -> bool {
self.sent.contains_key(hash)
Expand Down
2 changes: 1 addition & 1 deletion zebrad/src/components/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ where
// https://github.com/ZcashFoundation/zebra/issues/2909
let err_str = format!("{e:?}");
if err_str.contains("AlreadyVerified")
|| err_str.contains("AlreadyInState")
|| err_str.contains("AlreadyInChain")
|| err_str.contains("block is already committed to the state")
|| err_str.contains("block has already been sent to be committed to the state")
|| err_str.contains("NotFound")
Expand Down

0 comments on commit 2c15632

Please sign in to comment.