diff --git a/Cargo.lock b/Cargo.lock index 882bca3604f..5fa46c8bd62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3571,6 +3571,7 @@ dependencies = [ "assert_matches", "async-trait", "borsh 1.0.0", + "bytesize", "chrono", "cloud-storage", "derive_more", @@ -3722,6 +3723,7 @@ dependencies = [ "near-chain-configs", "near-chain-primitives", "near-crypto", + "near-o11y", "near-primitives", "near-store", "num-rational", diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index 8487838d243..3ac66e5073d 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -13,6 +13,7 @@ use near_epoch_manager::types::BlockHeaderInfo; use near_epoch_manager::{EpochManagerAdapter, RngSeed}; use near_pool::types::TransactionGroupIterator; use near_primitives::account::{AccessKey, Account}; +use near_primitives::block::Tip; use near_primitives::block_header::{Approval, ApprovalInner}; use near_primitives::epoch_manager::block_info::BlockInfo; use near_primitives::epoch_manager::epoch_info::EpochInfo; @@ -954,6 +955,14 @@ impl EpochManagerAdapter for MockEpochManager { Ok(true) } + fn verify_chunk_state_witness_signature_in_epoch( + &self, + _state_witness: &ChunkStateWitness, + _epoch_id: &EpochId, + ) -> Result { + Ok(true) + } + fn cares_about_shard_from_prev_block( &self, parent_hash: &CryptoHash, @@ -1002,6 +1011,14 @@ impl EpochManagerAdapter for MockEpochManager { Ok(shard_layout != next_shard_layout) } + fn possible_epochs_of_height_around_tip( + &self, + _tip: &Tip, + _height: BlockHeight, + ) -> Result, EpochError> { + unimplemented!(); + } + #[cfg(feature = "new_epoch_sync")] fn get_all_epoch_hashes( &self, diff --git a/chain/client/Cargo.toml b/chain/client/Cargo.toml index de33b311637..9dcfa8621cc 100644 --- a/chain/client/Cargo.toml +++ b/chain/client/Cargo.toml @@ -41,6 +41,7 @@ thiserror.workspace = true tokio.workspace = true tracing.workspace = true yansi.workspace = true +bytesize.workspace = true near-async.workspace = true near-cache.workspace = true diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index fa1e42405e2..3a07260fc62 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -351,6 +351,7 @@ impl Client { network_adapter.clone().into_sender(), runtime_adapter.clone(), chunk_endorsement_tracker.clone(), + config.orphan_state_witness_pool_size, ); let chunk_distribution_network = ChunkDistributionNetwork::from_config(&config); Ok(Self { @@ -1640,6 +1641,8 @@ impl Client { self.shards_manager_adapter .send(ShardsManagerRequestFromClient::CheckIncompleteChunks(*block.hash())); + + self.process_ready_orphan_chunk_state_witnesses(&block); } /// Reconcile the transaction pool after processing a block. diff --git a/chain/client/src/client_actions.rs b/chain/client/src/client_actions.rs index 11de323e68f..7eb3dada71a 100644 --- a/chain/client/src/client_actions.rs +++ b/chain/client/src/client_actions.rs @@ -1824,35 +1824,13 @@ impl ClientActionHandler for ClientActions { } } -impl ClientActions { - pub fn handle_state_witness_message( - &mut self, - msg: ChunkStateWitnessMessage, - ctx: &mut dyn DelayedActionRunner, - ) { - let peer_id = msg.peer_id.clone(); - let attempts_remaining = msg.attempts_remaining; - match self.client.process_chunk_state_witness(msg.witness, msg.peer_id, None) { - Err(err) => { - tracing::error!(target: "client", ?err, "Error processing chunk state witness"); - } - Ok(Some(witness)) => { - if attempts_remaining > 0 { - ctx.run_later(Duration::from_millis(100), move |actions, ctx| { - actions.handle_state_witness_message( - ChunkStateWitnessMessage { - witness, - peer_id, - attempts_remaining: attempts_remaining - 1, - }, - ctx, - ); - }); - } else { - tracing::error!(target: "client", "Failed to process chunk state witness even after 5 tries due to missing parent block"); - } - } - Ok(None) => {} +impl ClientActionHandler for ClientActions { + type Result = (); + + #[perf] + fn handle(&mut self, msg: ChunkStateWitnessMessage) -> Self::Result { + if let Err(err) = self.client.process_chunk_state_witness(msg.witness, msg.peer_id, None) { + tracing::error!(target: "client", ?err, "Error processing chunk state witness"); } } } diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 74033415dd4..7c2d2fb432b 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -21,7 +21,6 @@ use near_chunks::adapter::ShardsManagerRequestFromClient; use near_client_primitives::types::Error; use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::{EpochManagerAdapter, RngSeed}; -use near_network::client::ChunkStateWitnessMessage; use near_network::types::PeerManagerAdapter; use near_o11y::{handler_debug_span, WithSpanContext}; use near_primitives::network::PeerId; @@ -151,18 +150,6 @@ where } } -// This one requires the context for further scheduling of messages, so -// we can't use the generic wrapper above. -impl Handler> for ClientActor { - type Result = (); - - fn handle(&mut self, msg: WithSpanContext, ctx: &mut Context) { - self.wrap(msg, ctx, "ChunkStateWitnessMessage", |this, msg, ctx| { - this.actions.handle_state_witness_message(msg, ctx) - }) - } -} - /// Returns random seed sampled from the current thread pub fn random_seed_from_thread() -> RngSeed { let mut rng_seed: RngSeed = [0; 32]; diff --git a/chain/client/src/lib.rs b/chain/client/src/lib.rs index af34791cae9..0b330d9593d 100644 --- a/chain/client/src/lib.rs +++ b/chain/client/src/lib.rs @@ -13,12 +13,16 @@ pub use crate::client::{Client, ProduceChunkResult}; pub use crate::client_actions::NetworkAdversarialMessage; pub use crate::client_actor::{start_client, ClientActor}; pub use crate::config_updater::ConfigUpdater; +pub use crate::stateless_validation::chunk_validator::orphan_witness_handling::{ + HandleOrphanWitnessOutcome, MAX_ORPHAN_WITNESS_SIZE, +}; pub use crate::sync::adapter::{SyncAdapter, SyncMessage}; pub use crate::view_client::{start_view_client, ViewClientActor}; pub use near_client_primitives::debug::DebugStatus; pub use near_network::client::{ BlockApproval, BlockResponse, ProcessTxRequest, ProcessTxResponse, SetNetworkInfo, }; +pub use stateless_validation::processing_tracker::{ProcessingDoneTracker, ProcessingDoneWaiter}; pub mod adapter; pub mod adversarial; diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index b153e3cbc99..a9bf2324448 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -583,6 +583,34 @@ pub(crate) static CHUNK_STATE_WITNESS_TOTAL_SIZE: Lazy = Lazy::new .unwrap() }); +pub(crate) static ORPHAN_CHUNK_STATE_WITNESSES_TOTAL_COUNT: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_orphan_chunk_state_witness_total_count", + "Total number of orphaned chunk state witnesses that were saved for later processing", + &["shard_id"], + ) + .unwrap() +}); + +pub(crate) static ORPHAN_CHUNK_STATE_WITNESS_POOL_SIZE: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_orphan_chunk_state_witness_pool_size", + "Number of orphaned witnesses kept in OrphanStateWitnessPool (by shard_id)", + &["shard_id"], + ) + .unwrap() +}); + +pub(crate) static ORPHAN_CHUNK_STATE_WITNESS_POOL_MEMORY_USED: Lazy = + Lazy::new(|| { + try_create_int_gauge_vec( + "near_orphan_chunk_state_witness_pool_memory_used", + "Memory in bytes consumed by the OrphanStateWitnessPool (by shard_id)", + &["shard_id"], + ) + .unwrap() + }); + pub(crate) static BLOCK_PRODUCER_ENDORSED_STAKE_RATIO: Lazy = Lazy::new(|| { try_create_histogram_vec( "near_block_producer_endorsed_stake_ratio", diff --git a/chain/client/src/stateless_validation/chunk_validator.rs b/chain/client/src/stateless_validation/chunk_validator/mod.rs similarity index 94% rename from chain/client/src/stateless_validation/chunk_validator.rs rename to chain/client/src/stateless_validation/chunk_validator/mod.rs index a92a3557af0..584ad8f33da 100644 --- a/chain/client/src/stateless_validation/chunk_validator.rs +++ b/chain/client/src/stateless_validation/chunk_validator/mod.rs @@ -1,3 +1,6 @@ +pub mod orphan_witness_handling; +pub mod orphan_witness_pool; + use super::processing_tracker::ProcessingDoneTracker; use crate::stateless_validation::chunk_endorsement_tracker::ChunkEndorsementTracker; use crate::{metrics, Client}; @@ -31,6 +34,7 @@ use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::ShardId; use near_primitives::validator_signer::ValidatorSigner; use near_store::PartialStorage; +use orphan_witness_pool::OrphanStateWitnessPool; use std::collections::HashMap; use std::sync::Arc; @@ -53,6 +57,7 @@ pub struct ChunkValidator { network_sender: Sender, runtime_adapter: Arc, chunk_endorsement_tracker: Arc, + orphan_witness_pool: OrphanStateWitnessPool, } impl ChunkValidator { @@ -62,6 +67,7 @@ impl ChunkValidator { network_sender: Sender, runtime_adapter: Arc, chunk_endorsement_tracker: Arc, + orphan_witness_pool_size: usize, ) -> Self { Self { my_signer, @@ -69,6 +75,7 @@ impl ChunkValidator { network_sender, runtime_adapter, chunk_endorsement_tracker, + orphan_witness_pool: OrphanStateWitnessPool::new(orphan_witness_pool_size), } } @@ -639,14 +646,40 @@ impl Client { witness: ChunkStateWitness, peer_id: PeerId, processing_done_tracker: Option, - ) -> Result, Error> { + ) -> Result<(), Error> { let prev_block_hash = witness.inner.chunk_header.prev_block_hash(); - if self.chain.get_block(prev_block_hash).is_err() { - return Ok(Some(witness)); + let prev_block = match self.chain.get_block(prev_block_hash) { + Ok(block) => block, + Err(Error::DBNotFoundErr(_)) => { + // Previous block isn't available at the moment, add this witness to the orphan pool. + self.handle_orphan_state_witness(witness)?; + return Ok(()); + } + Err(err) => return Err(err), + }; + self.process_chunk_state_witness_with_prev_block( + witness, + peer_id, + &prev_block, + processing_done_tracker, + ) + } + + pub fn process_chunk_state_witness_with_prev_block( + &mut self, + witness: ChunkStateWitness, + peer_id: PeerId, + prev_block: &Block, + processing_done_tracker: Option, + ) -> Result<(), Error> { + if witness.inner.chunk_header.prev_block_hash() != prev_block.hash() { + return Err(Error::Other(format!( + "process_chunk_state_witness_with_prev_block - prev_block doesn't match ({} != {})", + witness.inner.chunk_header.prev_block_hash(), + prev_block.hash() + ))); } - // TODO(#10265): If the previous block does not exist, we should - // queue this (similar to orphans) to retry later. let result = self.chunk_validator.start_validating_chunk( witness, &self.chain, @@ -661,6 +694,6 @@ impl Client { }, )); } - result.map(|_| None) + result } } diff --git a/chain/client/src/stateless_validation/chunk_validator/orphan_witness_handling.rs b/chain/client/src/stateless_validation/chunk_validator/orphan_witness_handling.rs new file mode 100644 index 00000000000..00bc6aac408 --- /dev/null +++ b/chain/client/src/stateless_validation/chunk_validator/orphan_witness_handling.rs @@ -0,0 +1,199 @@ +//! This module contains the logic of handling orphaned chunk state witnesses. +//! To process a ChunkStateWitness we need its previous block, but sometimes +//! the witness shows up before the previous block is available, so it can't be +//! processed immediately. In such cases the witness becomes an orphaned witness +//! and it's kept in the pool until the required block arrives. Once the block +//! arrives, all witnesses that were waiting for it can be processed. + +use crate::Client; +use bytesize::ByteSize; +use itertools::Itertools; +use near_chain::Block; +use near_chain_primitives::Error; +use near_primitives::network::PeerId; +use near_primitives::stateless_validation::ChunkStateWitness; +use near_primitives::types::{BlockHeight, EpochId}; +use std::ops::Range; + +/// We keep only orphan witnesses that are within this distance of +/// the current chain head. This helps to reduce the size of +/// OrphanStateWitnessPool and protects against spam attacks. +/// The range starts at 2 because a witness at height of head+1 would +/// have the previous block available (the chain head), so it wouldn't be an orphan. +pub const ALLOWED_ORPHAN_WITNESS_DISTANCE_FROM_HEAD: Range = 2..6; + +/// We keep only orphan witnesses which are smaller than this size. +/// This limits the maximum memory usage of OrphanStateWitnessPool. +/// TODO(#10259) - consider merging this limit with the non-orphan witness size limit. +pub const MAX_ORPHAN_WITNESS_SIZE: ByteSize = ByteSize::mb(40); + +impl Client { + pub fn handle_orphan_state_witness( + &mut self, + witness: ChunkStateWitness, + ) -> Result { + let chunk_header = &witness.inner.chunk_header; + let witness_height = chunk_header.height_created(); + let witness_shard = chunk_header.shard_id(); + + let _span = tracing::debug_span!(target: "client", + "handle_orphan_state_witness", + witness_height, + witness_shard, + witness_chunk = ?chunk_header.chunk_hash(), + witness_prev_block = ?chunk_header.prev_block_hash(), + ) + .entered(); + + // Don't save orphaned state witnesses which are far away from the current chain head. + let chain_head = &self.chain.head()?; + let head_distance = witness_height.saturating_sub(chain_head.height); + if !ALLOWED_ORPHAN_WITNESS_DISTANCE_FROM_HEAD.contains(&head_distance) { + tracing::debug!( + target: "client", + head_height = chain_head.height, + "Not saving an orphaned ChunkStateWitness because its height isn't within the allowed height range"); + return Ok(HandleOrphanWitnessOutcome::TooFarFromHead { + witness_height, + head_height: chain_head.height, + }); + } + + // Don't save orphaned state witnesses which are bigger than the allowed limit. + let witness_size = borsh::to_vec(&witness)?.len(); + let witness_size_u64: u64 = witness_size.try_into().map_err(|_| { + Error::Other(format!("Cannot convert witness size to u64: {}", witness_size)) + })?; + if witness_size_u64 > MAX_ORPHAN_WITNESS_SIZE.as_u64() { + tracing::warn!( + target: "client", + witness_height, + witness_shard, + witness_chunk = ?chunk_header.chunk_hash(), + witness_prev_block = ?chunk_header.prev_block_hash(), + witness_size, + "Not saving an orphaned ChunkStateWitness because it's too big. This is unexpected."); + return Ok(HandleOrphanWitnessOutcome::TooBig(witness_size)); + } + + // Try to find the EpochId to which this witness will belong based on its height. + // It's not always possible to determine the exact epoch_id because the exact + // starting height of the next epoch isn't known until it actually starts, + // so things can get unclear around epoch boundaries. + // Let's collect the epoch_ids in which the witness might possibly be. + let possible_epochs = + self.epoch_manager.possible_epochs_of_height_around_tip(&chain_head, witness_height)?; + + // Try to validate the witness assuming that it resides in one of the possible epochs. + // The witness must pass validation in one of these epochs before it can be admitted to the pool. + let mut epoch_validation_result: Option> = None; + for epoch_id in possible_epochs { + match self.partially_validate_orphan_witness_in_epoch(&witness, &epoch_id) { + Ok(()) => { + epoch_validation_result = Some(Ok(())); + break; + } + Err(err) => epoch_validation_result = Some(Err(err)), + } + } + match epoch_validation_result { + Some(Ok(())) => {} // Validation passed in one of the possible epochs, witness can be added to the pool. + Some(Err(err)) => { + // Validation failed in all possible epochs, reject the witness + return Err(err); + } + None => { + // possible_epochs was empty. This shouldn't happen as all epochs around the chain head are known. + return Err(Error::Other(format!( + "Couldn't find any matching EpochId for orphan chunk state witness with height {}", + witness_height + ))); + } + } + + // Orphan witness is OK, save it to the pool + tracing::debug!(target: "client", "Saving an orphaned ChunkStateWitness to orphan pool"); + self.chunk_validator.orphan_witness_pool.add_orphan_state_witness(witness, witness_size); + Ok(HandleOrphanWitnessOutcome::SavedToPool) + } + + fn partially_validate_orphan_witness_in_epoch( + &self, + witness: &ChunkStateWitness, + epoch_id: &EpochId, + ) -> Result<(), Error> { + let chunk_header = &witness.inner.chunk_header; + let witness_height = chunk_header.height_created(); + let witness_shard = chunk_header.shard_id(); + + // Validate shard_id + if !self.epoch_manager.get_shard_layout(&epoch_id)?.shard_ids().contains(&witness_shard) { + return Err(Error::InvalidChunkStateWitness(format!( + "Invalid shard_id in ChunkStateWitness: {}", + witness_shard + ))); + } + + // Reject witnesses for chunks for which which this node isn't a validator. + // It's an error, as the sender shouldn't send the witness to a non-validator node. + let Some(my_signer) = self.chunk_validator.my_signer.as_ref() else { + return Err(Error::NotAValidator); + }; + let chunk_validator_assignments = self.epoch_manager.get_chunk_validator_assignments( + &epoch_id, + witness_shard, + witness_height, + )?; + if !chunk_validator_assignments.contains(my_signer.validator_id()) { + return Err(Error::NotAChunkValidator); + } + + // Verify signature + if !self.epoch_manager.verify_chunk_state_witness_signature_in_epoch(&witness, &epoch_id)? { + return Err(Error::InvalidChunkStateWitness("Invalid signature".to_string())); + } + + Ok(()) + } + + /// Once a new block arrives, we can process the orphaned chunk state witnesses that were waiting + // for this block. This function takes the ready witnesses out of the orhan pool and process them. + pub fn process_ready_orphan_chunk_state_witnesses(&mut self, new_block: &Block) { + let ready_witnesses = self + .chunk_validator + .orphan_witness_pool + .take_state_witnesses_waiting_for_block(new_block.hash()); + for witness in ready_witnesses { + let header = &witness.inner.chunk_header; + tracing::debug!( + target: "client", + witness_height = header.height_created(), + witness_shard = header.shard_id(), + witness_chunk = ?header.chunk_hash(), + witness_prev_block = ?header.prev_block_hash(), + "Processing an orphaned ChunkStateWitness, its previous block has arrived." + ); + if let Err(err) = self.process_chunk_state_witness_with_prev_block( + witness, + PeerId::random(), // TODO: Should peer_id even be here? https://github.com/near/stakewars-iv/issues/17 + new_block, + None, + ) { + tracing::error!(target: "client", ?err, "Error processing orphan chunk state witness"); + } + } + } +} + +/// Outcome of processing an orphaned witness. +/// If the witness is clearly invalid, it's rejected and the handler function produces an error. +/// Sometimes the witness might not be strictly invalid, but we still don't want to save it because +/// of other reasons. In such cases the handler function returns Ok(outcome) to let the caller +/// know what happened with the witness. +/// It's useful in tests. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HandleOrphanWitnessOutcome { + SavedToPool, + TooBig(usize), + TooFarFromHead { head_height: BlockHeight, witness_height: BlockHeight }, +} diff --git a/chain/client/src/stateless_validation/chunk_validator/orphan_witness_pool.rs b/chain/client/src/stateless_validation/chunk_validator/orphan_witness_pool.rs new file mode 100644 index 00000000000..c803e6de376 --- /dev/null +++ b/chain/client/src/stateless_validation/chunk_validator/orphan_witness_pool.rs @@ -0,0 +1,401 @@ +use lru::LruCache; +use near_chain_configs::default_orphan_state_witness_pool_size; +use near_primitives::hash::CryptoHash; +use near_primitives::stateless_validation::ChunkStateWitness; +use near_primitives::types::{BlockHeight, ShardId}; + +use metrics_tracker::OrphanWitnessMetricsTracker; + +/// `OrphanStateWitnessPool` is used to keep orphaned ChunkStateWitnesses until it's possible to process them. +/// To process a ChunkStateWitness we need to have the previous block, but it might happen that a ChunkStateWitness +/// shows up before the block is available. In such cases the witness is put in `OrphanStateWitnessPool` until the +/// required block arrives and the witness can be processed. +pub struct OrphanStateWitnessPool { + witness_cache: LruCache<(ShardId, BlockHeight), CacheEntry>, +} + +struct CacheEntry { + witness: ChunkStateWitness, + _metrics_tracker: OrphanWitnessMetricsTracker, +} + +impl OrphanStateWitnessPool { + /// Create a new `OrphanStateWitnessPool` with a capacity of `cache_capacity` witnesses. + /// The `Default` trait implementation provides reasonable defaults. + pub fn new(cache_capacity: usize) -> Self { + if cache_capacity > 128 { + tracing::warn!( + target: "client", + "OrphanStateWitnessPool capacity is set to {}, which is larger than expected. \ + OrphanStateWitnessPool uses a naive algorithm, using a large capacity might lead \ + to performance problems.", cache_capacity); + } + + OrphanStateWitnessPool { witness_cache: LruCache::new(cache_capacity) } + } + + /// Add an orphaned chunk state witness to the pool. The witness will be put in a cache and it'll + /// wait there for the block that's required to process it. + /// It's expected that this `ChunkStateWitness` has gone through basic validation - including signature, + /// shard_id, size and distance from the tip. The pool would still work without it, but without validation + /// it'd be possible to fill the whole cache with spam. + /// `witness_size` is only used for metrics, it's okay to pass 0 if you don't care about the metrics. + pub fn add_orphan_state_witness(&mut self, witness: ChunkStateWitness, witness_size: usize) { + // Insert the new ChunkStateWitness into the cache + let chunk_header = &witness.inner.chunk_header; + let cache_key = (chunk_header.shard_id(), chunk_header.height_created()); + let metrics_tracker = OrphanWitnessMetricsTracker::new(&witness, witness_size); + let cache_entry = CacheEntry { witness, _metrics_tracker: metrics_tracker }; + if let Some((_, ejected_entry)) = self.witness_cache.push(cache_key, cache_entry) { + // Another witness has been ejected from the cache due to capacity limit + let header = &ejected_entry.witness.inner.chunk_header; + tracing::debug!( + target: "client", + ejected_witness_height = header.height_created(), + ejected_witness_shard = header.shard_id(), + ejected_witness_chunk = ?header.chunk_hash(), + ejected_witness_prev_block = ?header.prev_block_hash(), + "Ejecting an orphaned ChunkStateWitness from the cache due to capacity limit. It will not be processed." + ); + } + } + + /// Find all orphaned witnesses that were waiting for this block and remove them from the pool. + /// The block has arrived, so they can be now processed, they're no longer orphans. + pub fn take_state_witnesses_waiting_for_block( + &mut self, + prev_block: &CryptoHash, + ) -> Vec { + let mut to_remove: Vec<(ShardId, BlockHeight)> = Vec::new(); + for (cache_key, cache_entry) in self.witness_cache.iter() { + if cache_entry.witness.inner.chunk_header.prev_block_hash() == prev_block { + to_remove.push(*cache_key); + } + } + let mut result = Vec::new(); + for cache_key in to_remove { + let ready_witness = self + .witness_cache + .pop(&cache_key) + .expect("The cache contains this entry, a moment ago it was iterated over"); + result.push(ready_witness.witness); + } + result + } +} + +impl Default for OrphanStateWitnessPool { + fn default() -> OrphanStateWitnessPool { + OrphanStateWitnessPool::new(default_orphan_state_witness_pool_size()) + } +} + +mod metrics_tracker { + use near_primitives::stateless_validation::ChunkStateWitness; + + use crate::metrics; + + /// OrphanWitnessMetricsTracker is a helper struct which leverages RAII to update + /// the metrics about witnesses in the orphan pool when they're added and removed. + /// Its constructor adds the witness to the metrics, and later its destructor + /// removes the witness from metrics. + /// Using this struct is much less error-prone than adjusting the metrics by hand. + pub struct OrphanWitnessMetricsTracker { + shard_id: String, + witness_size: usize, + } + + impl OrphanWitnessMetricsTracker { + pub fn new( + witness: &ChunkStateWitness, + witness_size: usize, + ) -> OrphanWitnessMetricsTracker { + let shard_id = witness.inner.chunk_header.shard_id().to_string(); + metrics::ORPHAN_CHUNK_STATE_WITNESSES_TOTAL_COUNT + .with_label_values(&[shard_id.as_str()]) + .inc(); + metrics::ORPHAN_CHUNK_STATE_WITNESS_POOL_SIZE + .with_label_values(&[shard_id.as_str()]) + .inc(); + metrics::ORPHAN_CHUNK_STATE_WITNESS_POOL_MEMORY_USED + .with_label_values(&[shard_id.as_str()]) + .add(witness_size_to_i64(witness_size)); + + OrphanWitnessMetricsTracker { shard_id, witness_size } + } + } + + impl Drop for OrphanWitnessMetricsTracker { + fn drop(&mut self) { + metrics::ORPHAN_CHUNK_STATE_WITNESS_POOL_SIZE + .with_label_values(&[self.shard_id.as_str()]) + .dec(); + metrics::ORPHAN_CHUNK_STATE_WITNESS_POOL_MEMORY_USED + .with_label_values(&[self.shard_id.as_str()]) + .sub(witness_size_to_i64(self.witness_size)); + } + } + + fn witness_size_to_i64(witness_size: usize) -> i64 { + witness_size.try_into().expect( + "Orphaned ChunkStateWitness size can't be converted to i64. \ + This should be impossible, is it over one exabyte in size?", + ) + } +} + +#[cfg(test)] +mod tests { + use near_primitives::hash::{hash, CryptoHash}; + use near_primitives::sharding::{ShardChunkHeader, ShardChunkHeaderInner}; + use near_primitives::stateless_validation::ChunkStateWitness; + use near_primitives::types::{BlockHeight, ShardId}; + + use super::OrphanStateWitnessPool; + + /// Make a dummy witness for testing + /// encoded_length is used to differentiate between witnesses with the same main parameters. + fn make_witness( + height: BlockHeight, + shard_id: ShardId, + prev_block_hash: CryptoHash, + encoded_length: u64, + ) -> ChunkStateWitness { + let mut witness = ChunkStateWitness::new_dummy(height, shard_id, prev_block_hash); + match &mut witness.inner.chunk_header { + ShardChunkHeader::V3(header) => match &mut header.inner { + ShardChunkHeaderInner::V2(inner) => inner.encoded_length = encoded_length, + _ => unimplemented!(), + }, + _ => unimplemented!(), + } + witness + } + + /// Generate fake block hash based on height + fn block(height: BlockHeight) -> CryptoHash { + hash(&height.to_be_bytes()) + } + + /// Assert that both Vecs are equal after sorting. It's order-independent, unlike the standard assert_eq! + fn assert_contents(mut observed: Vec, mut expected: Vec) { + let sort_comparator = |witness1: &ChunkStateWitness, witness2: &ChunkStateWitness| { + let bytes1 = borsh::to_vec(witness1).unwrap(); + let bytes2 = borsh::to_vec(witness2).unwrap(); + bytes1.cmp(&bytes2) + }; + observed.sort_by(sort_comparator); + expected.sort_by(sort_comparator); + if observed != expected { + let print_witness_info = |witness: &ChunkStateWitness| { + let header = &witness.inner.chunk_header; + eprintln!( + "- height = {}, shard_id = {}, encoded_length: {} prev_block: {}", + header.height_created(), + header.shard_id(), + header.encoded_length(), + header.prev_block_hash() + ); + }; + eprintln!("Mismatch!"); + eprintln!("Expected {} witnesses:", expected.len()); + for witness in expected { + print_witness_info(&witness); + } + eprintln!("Observed {} witnesses:", observed.len()); + for witness in observed { + print_witness_info(&witness); + } + eprintln!("=================="); + panic!("assert_contents failed"); + } + } + + // Check that the pool is empty, all witnesses have been removed + fn assert_empty(pool: &OrphanStateWitnessPool) { + assert_eq!(pool.witness_cache.len(), 0); + } + + /// Basic functionality - inserting witnesses and fetching them works as expected + #[test] + fn basic() { + let mut pool = OrphanStateWitnessPool::new(10); + + let witness1 = make_witness(100, 1, block(99), 0); + let witness2 = make_witness(100, 2, block(99), 0); + let witness3 = make_witness(101, 1, block(100), 0); + let witness4 = make_witness(101, 2, block(100), 0); + + pool.add_orphan_state_witness(witness1.clone(), 0); + pool.add_orphan_state_witness(witness2.clone(), 0); + pool.add_orphan_state_witness(witness3.clone(), 0); + pool.add_orphan_state_witness(witness4.clone(), 0); + + let waiting_for_99 = pool.take_state_witnesses_waiting_for_block(&block(99)); + assert_contents(waiting_for_99, vec![witness1, witness2]); + + let waiting_for_100 = pool.take_state_witnesses_waiting_for_block(&block(100)); + assert_contents(waiting_for_100, vec![witness3, witness4]); + + assert_empty(&pool); + } + + /// When a new witness is inserted with the same (shard_id, height) as an existing witness, the new witness + /// should replace the old one. The old one should be ejected from the pool. + #[test] + fn replacing() { + let mut pool = OrphanStateWitnessPool::new(10); + + // The old witness is replaced when the awaited block is the same + { + let witness1 = make_witness(100, 1, block(99), 0); + let witness2 = make_witness(100, 1, block(99), 1); + pool.add_orphan_state_witness(witness1, 0); + pool.add_orphan_state_witness(witness2.clone(), 0); + + let waiting_for_99 = pool.take_state_witnesses_waiting_for_block(&block(99)); + assert_contents(waiting_for_99, vec![witness2]); + } + + // The old witness is replaced when the awaited block is different, waiting_for_block is cleaned as expected + { + let witness3 = make_witness(102, 1, block(100), 0); + let witness4 = make_witness(102, 1, block(101), 0); + pool.add_orphan_state_witness(witness3, 0); + pool.add_orphan_state_witness(witness4.clone(), 0); + + let waiting_for_101 = pool.take_state_witnesses_waiting_for_block(&block(101)); + assert_contents(waiting_for_101, vec![witness4]); + + let waiting_for_100 = pool.take_state_witnesses_waiting_for_block(&block(100)); + assert_contents(waiting_for_100, vec![]); + } + + assert_empty(&pool); + } + + /// The pool has limited capacity. Once it hits the capacity, the least-recently used witness will be ejected. + #[test] + fn limited_capacity() { + let mut pool = OrphanStateWitnessPool::new(2); + + let witness1 = make_witness(102, 1, block(101), 0); + let witness2 = make_witness(101, 1, block(100), 0); + let witness3 = make_witness(101, 2, block(100), 0); + + pool.add_orphan_state_witness(witness1, 0); + pool.add_orphan_state_witness(witness2.clone(), 0); + + // Inserting the third witness causes the pool to go over capacity, so witness1 should be ejected. + pool.add_orphan_state_witness(witness3.clone(), 0); + + let waiting_for_100 = pool.take_state_witnesses_waiting_for_block(&block(100)); + assert_contents(waiting_for_100, vec![witness2, witness3]); + + // witness1 should be ejected, no one is waiting for block 101 + let waiting_for_101 = pool.take_state_witnesses_waiting_for_block(&block(101)); + assert_contents(waiting_for_101, vec![]); + + assert_empty(&pool); + } + + /// OrphanStateWitnessPool can handle large shard ids without any problems, it doesn't keep a Vec indexed by shard_id + #[test] + fn large_shard_id() { + let mut pool = OrphanStateWitnessPool::new(10); + + let large_shard_id = ShardId::MAX; + let witness = make_witness(101, large_shard_id, block(99), 0); + pool.add_orphan_state_witness(witness.clone(), 0); + + let waiting_for_99 = pool.take_state_witnesses_waiting_for_block(&block(99)); + assert_contents(waiting_for_99, vec![witness]); + + assert_empty(&pool); + } + + /// An OrphanStateWitnessPool with 0 capacity shouldn't crash, it should just ignore all witnesses + #[test] + fn zero_capacity() { + let mut pool = OrphanStateWitnessPool::new(0); + + pool.add_orphan_state_witness(make_witness(100, 1, block(99), 0), 0); + pool.add_orphan_state_witness(make_witness(100, 1, block(99), 0), 1); + pool.add_orphan_state_witness(make_witness(100, 2, block(99), 0), 0); + pool.add_orphan_state_witness(make_witness(101, 0, block(100), 0), 0); + + let waiting = pool.take_state_witnesses_waiting_for_block(&block(99)); + assert_contents(waiting, vec![]); + + assert_empty(&pool); + } + + /// OrphanStateWitnessPool has a Drop implementation which clears the metrics. + /// It's hard to test it because metrics are global and it could interfere with other tests, + /// but we can at least test that it doesn't crash. That's always something. + #[test] + fn destructor_doesnt_crash() { + let mut pool = OrphanStateWitnessPool::new(10); + pool.add_orphan_state_witness(make_witness(100, 0, block(99), 0), 0); + pool.add_orphan_state_witness(make_witness(100, 2, block(99), 0), 0); + pool.add_orphan_state_witness(make_witness(100, 2, block(99), 0), 1); + pool.add_orphan_state_witness(make_witness(101, 0, block(100), 0), 0); + std::mem::drop(pool); + } + + /// A longer test scenario + #[test] + fn scenario() { + let mut pool = OrphanStateWitnessPool::new(5); + + // Witnesses for shards 0, 1, 2, 3 at height 1000, looking for block 99 + let witness0 = make_witness(100, 0, block(99), 0); + let witness1 = make_witness(100, 1, block(99), 0); + let witness2 = make_witness(100, 2, block(99), 0); + let witness3 = make_witness(100, 3, block(99), 0); + pool.add_orphan_state_witness(witness0, 0); + pool.add_orphan_state_witness(witness1, 0); + pool.add_orphan_state_witness(witness2, 0); + pool.add_orphan_state_witness(witness3, 0); + + // Another witness on shard 1, height 100. Should replace witness1 + let witness5 = make_witness(100, 1, block(99), 1); + pool.add_orphan_state_witness(witness5.clone(), 0); + + // Witnesses for shards 0, 1, 2, 3 at height 101, looking for block 100 + let witness6 = make_witness(101, 0, block(100), 0); + let witness7 = make_witness(101, 1, block(100), 0); + let witness8 = make_witness(101, 2, block(100), 0); + let witness9 = make_witness(101, 3, block(100), 0); + pool.add_orphan_state_witness(witness6, 0); + pool.add_orphan_state_witness(witness7.clone(), 0); + pool.add_orphan_state_witness(witness8.clone(), 0); + pool.add_orphan_state_witness(witness9.clone(), 0); + + // Pool capacity is 5, so three witnesses at height 100 should be ejected. + // The only surviving witness should be witness5, which was the freshest one among them + let looking_for_99 = pool.take_state_witnesses_waiting_for_block(&block(99)); + assert_contents(looking_for_99, vec![witness5]); + + // Let's add a few more witnesses + let witness10 = make_witness(102, 1, block(101), 0); + let witness11 = make_witness(102, 4, block(100), 0); + let witness12 = make_witness(102, 1, block(77), 0); + pool.add_orphan_state_witness(witness10, 0); + pool.add_orphan_state_witness(witness11.clone(), 0); + pool.add_orphan_state_witness(witness12.clone(), 0); + + // Check that witnesses waiting for block 100 are correct + let waiting_for_100 = pool.take_state_witnesses_waiting_for_block(&block(100)); + assert_contents(waiting_for_100, vec![witness7, witness8, witness9, witness11]); + + // At this point the pool contains only witness12, no one should be waiting for block 101. + let waiting_for_101 = pool.take_state_witnesses_waiting_for_block(&block(101)); + assert_contents(waiting_for_101, vec![]); + + let waiting_for_77 = pool.take_state_witnesses_waiting_for_block(&block(77)); + assert_contents(waiting_for_77, vec![witness12]); + + assert_empty(&pool); + } +} diff --git a/chain/client/src/test_utils/test_env.rs b/chain/client/src/test_utils/test_env.rs index 69861e9fc5e..1515772016b 100644 --- a/chain/client/src/test_utils/test_env.rs +++ b/chain/client/src/test_utils/test_env.rs @@ -1,6 +1,6 @@ use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex}; -use std::time::Instant; +use std::time::{Duration, Instant}; use crate::stateless_validation::processing_tracker::{ ProcessingDoneTracker, ProcessingDoneWaiter, @@ -9,6 +9,7 @@ use crate::Client; use near_async::messaging::{CanSend, IntoMultiSender}; use near_async::time::Clock; use near_chain::test_utils::ValidatorSchedule; +use near_chain::types::Tip; use near_chain::{ChainGenesis, Provenance}; use near_chain_configs::GenesisConfig; use near_chain_primitives::error::QueryError; @@ -29,11 +30,11 @@ use near_primitives::epoch_manager::RngSeed; use near_primitives::errors::InvalidTxError; use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; -use near_primitives::sharding::PartialEncodedChunk; -use near_primitives::stateless_validation::ChunkStateWitness; +use near_primitives::sharding::{ChunkHash, PartialEncodedChunk}; +use near_primitives::stateless_validation::{ChunkEndorsement, ChunkStateWitness}; use near_primitives::test_utils::create_test_signer; use near_primitives::transaction::{Action, FunctionCallAction, SignedTransaction}; -use near_primitives::types::{AccountId, Balance, BlockHeight, EpochId, NumSeats}; +use near_primitives::types::{AccountId, Balance, BlockHeight, EpochId, NumSeats, ShardId}; use near_primitives::utils::MaybeValidated; use near_primitives::version::ProtocolVersion; use near_primitives::views::{ @@ -47,6 +48,9 @@ use super::setup::setup_client_with_runtime; use super::test_env_builder::TestEnvBuilder; use super::TEST_SEED; +/// Timeout used in tests that wait for a specific chunk endorsement to appear +const CHUNK_ENDORSEMENTS_TIMEOUT: Duration = Duration::from_secs(10); + /// An environment for writing integration tests with multiple clients. /// This environment can simulate near nodes without network and it can be configured to use different runtimes. pub struct TestEnv { @@ -382,6 +386,43 @@ impl TestEnv { self.propagate_chunk_endorsements(allow_errors); } + /// Wait until an endorsement for `chunk_hash` appears in the network messages send by + /// the Client with index `client_idx`. Times out after CHUNK_ENDORSEMENTS_TIMEOUT. + /// Doesn't process or consume the message, it just waits until the message appears on the network_adapter. + pub fn wait_for_chunk_endorsement( + &mut self, + client_idx: usize, + chunk_hash: &ChunkHash, + ) -> Result { + let start_time = Instant::now(); + let network_adapter = self.network_adapters[client_idx].clone(); + loop { + let mut endorsement_opt = None; + network_adapter.handle_filtered(|request| { + match &request { + PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::ChunkEndorsement(_receiver_account_id, endorsement), + ) if endorsement.chunk_hash() == chunk_hash => { + endorsement_opt = Some(endorsement.clone()); + } + _ => {} + }; + Some(request) + }); + + if let Some(endorsement) = endorsement_opt { + return Ok(endorsement); + } + + let elapsed_since_start = start_time.elapsed(); + if elapsed_since_start > CHUNK_ENDORSEMENTS_TIMEOUT { + return Err(TimeoutError(elapsed_since_start)); + } + + std::thread::sleep(Duration::from_millis(50)); + } + } + pub fn send_money(&mut self, id: usize) -> ProcessTxResponse { let account_id = self.get_client_id(0); let signer = @@ -553,6 +594,42 @@ impl TestEnv { self.clients[idx].validator_signer.as_ref().unwrap().validator_id() } + /// Returns the index of client with the given [`AccoountId`]. + pub fn get_client_index(&self, account_id: &AccountId) -> usize { + self.account_indices.index(account_id) + } + + /// Get block producer responsible for producing the block at height head.height + height_offset. + /// Doesn't handle epoch boundaries with height_offset > 1. With offsets bigger than one, + /// the function assumes that the epoch doesn't change after head.height + 1. + pub fn get_block_producer_at_offset(&self, head: &Tip, height_offset: u64) -> AccountId { + let client = &self.clients[0]; + let epoch_manager = &client.epoch_manager; + let parent_hash = &head.last_block_hash; + let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash).unwrap(); + let height = head.height + height_offset; + + epoch_manager.get_block_producer(&epoch_id, height).unwrap() + } + + /// Get chunk producer responsible for producing the chunk at height head.height + height_offset. + /// Doesn't handle epoch boundaries with height_offset > 1. With offsets bigger than one, + /// the function assumes that the epoch doesn't change after head.height + 1. + pub fn get_chunk_producer_at_offset( + &self, + head: &Tip, + height_offset: u64, + shard_id: ShardId, + ) -> AccountId { + let client = &self.clients[0]; + let epoch_manager = &client.epoch_manager; + let parent_hash = &head.last_block_hash; + let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash).unwrap(); + let height = head.height + height_offset; + + epoch_manager.get_chunk_producer(&epoch_id, height, shard_id).unwrap() + } + pub fn get_runtime_config(&self, idx: usize, epoch_id: EpochId) -> RuntimeConfig { self.clients[idx].runtime_adapter.get_protocol_config(&epoch_id).unwrap().runtime_config } @@ -685,3 +762,7 @@ impl AccountIndices { &mut container[self.0[account_id]] } } + +#[derive(thiserror::Error, Debug)] +#[error("Timed out after {0:?}")] +pub struct TimeoutError(Duration); diff --git a/chain/epoch-manager/Cargo.toml b/chain/epoch-manager/Cargo.toml index 51940e234d5..5f43d7d3d30 100644 --- a/chain/epoch-manager/Cargo.toml +++ b/chain/epoch-manager/Cargo.toml @@ -24,6 +24,7 @@ tracing.workspace = true # itertools has collect_vec which is useful in quick debugging prints itertools.workspace = true +near-o11y.workspace = true near-crypto.workspace = true near-primitives.workspace = true near-store.workspace = true @@ -38,6 +39,7 @@ protocol_feature_fix_staking_threshold = [ ] nightly = [ "near-chain-configs/nightly", + "near-o11y/nightly", "near-primitives/nightly", "near-store/nightly", "nightly_protocol", @@ -45,6 +47,7 @@ nightly = [ ] nightly_protocol = [ "near-chain-configs/nightly_protocol", + "near-o11y/nightly_protocol", "near-primitives/nightly_protocol", "near-store/nightly_protocol", ] diff --git a/chain/epoch-manager/src/adapter.rs b/chain/epoch-manager/src/adapter.rs index 629db77fdaf..40dfd7d7e93 100644 --- a/chain/epoch-manager/src/adapter.rs +++ b/chain/epoch-manager/src/adapter.rs @@ -4,6 +4,7 @@ use crate::EpochInfoAggregator; use crate::EpochManagerHandle; use near_chain_primitives::Error; use near_crypto::Signature; +use near_primitives::block::Tip; use near_primitives::block_header::{Approval, ApprovalInner, BlockHeader}; use near_primitives::epoch_manager::block_info::BlockInfo; use near_primitives::epoch_manager::epoch_info::EpochInfo; @@ -415,6 +416,12 @@ pub trait EpochManagerAdapter: Send + Sync { state_witness: &ChunkStateWitness, ) -> Result; + fn verify_chunk_state_witness_signature_in_epoch( + &self, + state_witness: &ChunkStateWitness, + epoch_id: &EpochId, + ) -> Result; + fn cares_about_shard_from_prev_block( &self, parent_hash: &CryptoHash, @@ -431,6 +438,20 @@ pub trait EpochManagerAdapter: Send + Sync { fn will_shard_layout_change(&self, parent_hash: &CryptoHash) -> Result; + /// Tries to estimate in which epoch the given height would reside. + /// Looks at the previous, current and next epoch around the tip + /// and adds them to the result if the height might be inside the epoch. + /// It returns a list of possible epochs instead of a single value + /// because sometimes it's impossible to determine the exact epoch + /// in which the height will be. The exact starting height of the + /// next epoch isn't known until it actually starts, so it's impossible + /// to determine the exact epoch for heights which are ahead of the tip. + fn possible_epochs_of_height_around_tip( + &self, + tip: &Tip, + height: BlockHeight, + ) -> Result, EpochError>; + /// Returns a vector of all hashes in the epoch ending with `last_block_info`. /// Only return blocks on chain of `last_block_info`. /// Hashes are returned in the order from the last block to the first block. @@ -1048,6 +1069,16 @@ impl EpochManagerAdapter for EpochManagerHandle { let chunk_header = &state_witness.inner.chunk_header; let epoch_id = epoch_manager.get_epoch_id_from_prev_block(chunk_header.prev_block_hash())?; + self.verify_chunk_state_witness_signature_in_epoch(state_witness, &epoch_id) + } + + fn verify_chunk_state_witness_signature_in_epoch( + &self, + state_witness: &ChunkStateWitness, + epoch_id: &EpochId, + ) -> Result { + let epoch_manager = self.read(); + let chunk_header = &state_witness.inner.chunk_header; let chunk_producer = epoch_manager.get_chunk_producer_info( &epoch_id, chunk_header.height_created(), @@ -1087,6 +1118,15 @@ impl EpochManagerAdapter for EpochManagerHandle { epoch_manager.will_shard_layout_change(parent_hash) } + fn possible_epochs_of_height_around_tip( + &self, + tip: &Tip, + height: BlockHeight, + ) -> Result, EpochError> { + let epoch_manager = self.read(); + epoch_manager.possible_epochs_of_height_around_tip(tip, height) + } + #[cfg(feature = "new_epoch_sync")] fn get_all_epoch_hashes( &self, diff --git a/chain/epoch-manager/src/lib.rs b/chain/epoch-manager/src/lib.rs index 2200fb90374..ca3e927a1e9 100644 --- a/chain/epoch-manager/src/lib.rs +++ b/chain/epoch-manager/src/lib.rs @@ -2,6 +2,7 @@ use crate::proposals::proposals_to_epoch_info; use crate::types::EpochInfoAggregator; use near_cache::SyncLruCache; use near_chain_configs::GenesisConfig; +use near_primitives::block::Tip; use near_primitives::checked_feature; use near_primitives::epoch_manager::block_info::BlockInfo; use near_primitives::epoch_manager::epoch_info::{EpochInfo, EpochSummary}; @@ -1904,6 +1905,81 @@ impl EpochManager { } } + pub fn possible_epochs_of_height_around_tip( + &self, + tip: &Tip, + height: BlockHeight, + ) -> Result, EpochError> { + // If the tip is at the genesis block, it has to be handled in a special way. + // For genesis block, epoch_first_block() is the dummy block (11111...) + // with height 0, which could cause issues with estimating the epoch end + // if the genesis height is nonzero. It's easier to handle it manually. + if tip.prev_block_hash == CryptoHash::default() { + if tip.height == height { + return Ok(vec![tip.epoch_id.clone()]); + } + + if height > tip.height { + return Ok(vec![tip.next_epoch_id.clone()]); + } + + return Ok(vec![]); + } + + // See if the height is in the current epoch + let current_epoch_first_block_hash = + *self.get_block_info(&tip.last_block_hash)?.epoch_first_block(); + let current_epoch_first_block_info = + self.get_block_info(¤t_epoch_first_block_hash)?; + + let current_epoch_start = current_epoch_first_block_info.height(); + let current_epoch_length = self.get_epoch_config(&tip.epoch_id)?.epoch_length; + let current_epoch_estimated_end = current_epoch_start.saturating_add(current_epoch_length); + + // All blocks with height lower than the estimated end are guaranteed to reside in the current epoch. + // The situation is clear here. + if (current_epoch_start..current_epoch_estimated_end).contains(&height) { + return Ok(vec![tip.epoch_id.clone()]); + } + + // If the height is higher than the current epoch's estimated end, then it's + // not clear in which epoch it'll be. Under normal circumstances it would be + // in the next epoch, but with missing blocks the current epoch could stretch out + // past its estimated end, so the height might end up being in the current epoch, + // even though its height is higher than the estimated end. + if height >= current_epoch_estimated_end { + return Ok(vec![tip.epoch_id.clone(), tip.next_epoch_id.clone()]); + } + + // Finally try the previous epoch. + // First and last blocks of the previous epoch are already known, so the situation is clear. + let prev_epoch_last_block_hash = current_epoch_first_block_info.prev_hash(); + let prev_epoch_last_block_info = self.get_block_info(prev_epoch_last_block_hash)?; + let prev_epoch_first_block_info = + self.get_block_info(prev_epoch_last_block_info.epoch_first_block())?; + + // If the current epoch is the epoch after genesis, then the previous + // epoch contains only the genesis block. This case has to be handled separately + // because epoch_first_block() points to the dummy block (1111..), which has height 0. + if tip.epoch_id == EpochId(CryptoHash::default()) { + let genesis_block_info = prev_epoch_last_block_info; + if height == genesis_block_info.height() { + return Ok(vec![genesis_block_info.epoch_id().clone()]); + } else { + return Ok(vec![]); + } + } + + if (prev_epoch_first_block_info.height()..=prev_epoch_last_block_info.height()) + .contains(&height) + { + return Ok(vec![prev_epoch_last_block_info.epoch_id().clone()]); + } + + // The height doesn't belong to any of the epochs around the tip, return an empty Vec. + Ok(vec![]) + } + #[cfg(feature = "new_epoch_sync")] pub fn get_all_epoch_hashes_from_db( &self, diff --git a/chain/epoch-manager/src/tests/mod.rs b/chain/epoch-manager/src/tests/mod.rs index e403a728cd7..94d504ffdc1 100644 --- a/chain/epoch-manager/src/tests/mod.rs +++ b/chain/epoch-manager/src/tests/mod.rs @@ -9,7 +9,9 @@ use crate::test_utils::{ record_with_block_info, reward, setup_default_epoch_manager, setup_epoch_manager, stake, DEFAULT_TOTAL_SUPPLY, }; +use near_o11y::testonly::init_test_logger; use near_primitives::account::id::AccountIdRef; +use near_primitives::block::Tip; use near_primitives::challenge::SlashedValidator; use near_primitives::epoch_manager::EpochConfig; use near_primitives::hash::hash; @@ -2809,3 +2811,334 @@ fn test_verify_chunk_state_witness() { bad_signer.sign_chunk_state_witness(&chunk_state_witness.inner).0; assert!(!epoch_manager.verify_chunk_state_witness_signature(&chunk_state_witness).unwrap()); } + +/// Simulate the blockchain over a few epochs and verify that possible_epochs_of_height_around_tip() +/// gives the correct results at each step. +/// Some of the blocks are missing to make the test more interesting. +/// The blocks present in each epoch are: +/// Epoch(111): genesis +/// Epoch(111): 1, 2, 3, 4, 5 +/// epoch1: 6, 7, 8, 9, 10 +/// epoch2: 12, 14, 16, 18, 20, 22, 24, 25, 26 +/// epoch3: 27, 28, 29, 30, 31 +/// epoch4: 32+ +#[test] +fn test_possible_epochs_of_height_around_tip() { + use std::str::FromStr; + init_test_logger(); + + let amount_staked = 1_000_000; + let account_id = AccountId::from_str("test1").unwrap(); + let validators = vec![(account_id, amount_staked)]; + let h = hash_range(50); + + let genesis_epoch = EpochId(CryptoHash::default()); + + let epoch_length = 5; + let mut epoch_manager = setup_default_epoch_manager(validators, epoch_length, 1, 2, 2, 90, 60); + + // Add the genesis block with height 1000 + let genesis_height = 1000; + record_block(&mut epoch_manager, CryptoHash::default(), h[0], genesis_height, vec![]); + + let genesis_tip = Tip { + height: genesis_height, + last_block_hash: h[0], + prev_block_hash: CryptoHash::default(), + epoch_id: genesis_epoch.clone(), + next_epoch_id: genesis_epoch.clone(), + }; + + assert_eq!( + epoch_manager.possible_epochs_of_height_around_tip(&genesis_tip, 0).unwrap(), + vec![] + ); + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&genesis_tip, genesis_height - 1) + .unwrap(), + vec![] + ); + assert_eq!( + epoch_manager.possible_epochs_of_height_around_tip(&genesis_tip, genesis_height).unwrap(), + vec![genesis_epoch.clone()] + ); + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&genesis_tip, genesis_height + 1) + .unwrap(), + vec![genesis_epoch.clone()] + ); + assert_eq!( + epoch_manager.possible_epochs_of_height_around_tip(&genesis_tip, 10000000).unwrap(), + vec![genesis_epoch.clone()] + ); + + let epoch1 = EpochId(h[0]); + tracing::info!(target: "test", ?epoch1); + + // Add blocks with heights 1..5, a standard epoch with no surprises + for i in 1..=5 { + let height = genesis_height + i as BlockHeight; + tracing::info!(target: "test", height); + record_block(&mut epoch_manager, h[i - 1], h[i], height, vec![]); + let tip = Tip { + height, + last_block_hash: h[i], + prev_block_hash: h[i - 1], + epoch_id: genesis_epoch.clone(), + next_epoch_id: epoch1.clone(), + }; + assert_eq!(epoch_manager.possible_epochs_of_height_around_tip(&tip, 0).unwrap(), vec![]); + assert_eq!( + epoch_manager.possible_epochs_of_height_around_tip(&tip, genesis_height).unwrap(), + vec![genesis_epoch.clone()] + ); + assert_eq!( + epoch_manager.possible_epochs_of_height_around_tip(&tip, genesis_height + 1).unwrap(), + vec![genesis_epoch.clone()] + ); + for h in 1..=5 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h as BlockHeight) + .unwrap(), + vec![genesis_epoch.clone()] + ); + } + assert_eq!( + epoch_manager.possible_epochs_of_height_around_tip(&tip, genesis_height + 6).unwrap(), + vec![genesis_epoch.clone(), epoch1.clone()] + ); + assert_eq!( + epoch_manager.possible_epochs_of_height_around_tip(&tip, 1000000).unwrap(), + vec![genesis_epoch.clone(), epoch1.clone()] + ); + } + + let epoch2 = EpochId(h[5]); + tracing::info!(target: "test", ?epoch2); + + // Add blocks with heights 6..10, also a standard epoch with no surprises + for i in 6..=10 { + let height = genesis_height + i as BlockHeight; + tracing::info!(target: "test", height); + record_block(&mut epoch_manager, h[i - 1], h[i], height, vec![]); + let tip = Tip { + height, + last_block_hash: h[i], + prev_block_hash: h[i - 1], + epoch_id: epoch1.clone(), + next_epoch_id: epoch2.clone(), + }; + assert_eq!(epoch_manager.possible_epochs_of_height_around_tip(&tip, 0).unwrap(), vec![]); + assert_eq!( + epoch_manager.possible_epochs_of_height_around_tip(&tip, genesis_height).unwrap(), + vec![] + ); + for h in 1..=5 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![genesis_epoch.clone()] + ); + } + for h in 6..=10 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h as BlockHeight) + .unwrap(), + vec![epoch1.clone()] + ); + } + assert_eq!( + epoch_manager.possible_epochs_of_height_around_tip(&tip, genesis_height + 11).unwrap(), + vec![epoch1.clone(), epoch2.clone()] + ); + assert_eq!( + epoch_manager.possible_epochs_of_height_around_tip(&tip, 1000000).unwrap(), + vec![epoch1.clone(), epoch2.clone()] + ); + } + + let epoch3 = EpochId(h[10]); + tracing::info!(target: "test", ?epoch3); + + // Now there is a very long epoch with no final blocks (all odd blocks are missing) + // For all the blocks inside this for the last final block will be block #8, as it has #9 and #10 + // on top of it. + let last_final_block_hash = h[8]; + let last_finalized_height = genesis_height + 8; + for i in (12..=24).filter(|i| i % 2 == 0) { + let height = genesis_height + i as BlockHeight; + tracing::info!(target: "test", height); + let block_info = block_info( + h[i], + height, + last_finalized_height, + last_final_block_hash, + h[i - 2], + h[12], + vec![], + DEFAULT_TOTAL_SUPPLY, + ); + epoch_manager.record_block_info(block_info, [0; 32]).unwrap().commit().unwrap(); + let tip = Tip { + height, + last_block_hash: h[i], + prev_block_hash: h[i - 2], + epoch_id: epoch2.clone(), + next_epoch_id: epoch3.clone(), + }; + for h in 0..=5 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![] + ); + } + for h in 6..=10 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![epoch1.clone()] + ); + } + // Block 11 isn't in any epoch. Block 10 was the last of the previous epoch and block 12 + // is the first one of the new epoch. Block 11 was skipped and doesn't belong to any epoch. + assert_eq!( + epoch_manager.possible_epochs_of_height_around_tip(&tip, genesis_height + 11).unwrap(), + vec![] + ); + for h in 12..17 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![epoch2.clone()] + ); + } + for h in 17..=24 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![epoch2.clone(), epoch3.clone()] + ); + } + } + + // Finally there are two consecutive blocks on top of block 24 which + // make block 24 final and finalize epoch2. + for i in [25, 26] { + let height = genesis_height + i as BlockHeight; + tracing::info!(target: "test", height); + let block_info = block_info( + h[i], + height, + if i == 26 { genesis_height + 24 } else { last_finalized_height }, + if i == 26 { h[24] } else { last_final_block_hash }, + h[i - 1], + h[12], + vec![], + DEFAULT_TOTAL_SUPPLY, + ); + epoch_manager.record_block_info(block_info, [0; 32]).unwrap().commit().unwrap(); + let tip = Tip { + height, + last_block_hash: h[i], + prev_block_hash: h[i - 1], + epoch_id: epoch2.clone(), + next_epoch_id: epoch3.clone(), + }; + for h in 0..=5 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![] + ); + } + for h in 6..=10 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![epoch1.clone()] + ); + } + // Block 11 isn't in any epoch. Block 10 was the last of the previous epoch and block 12 + // is the first one of the new epoch. Block 11 was skipped and doesn't belong to any epoch. + assert_eq!( + epoch_manager.possible_epochs_of_height_around_tip(&tip, genesis_height + 11).unwrap(), + vec![] + ); + for h in 12..17 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![epoch2.clone()] + ); + } + for h in 17..=26 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![epoch2.clone(), epoch3.clone()] + ); + } + } + + // One more epoch without surprises to make sure that the previous weird epoch is handled correctly + let epoch4 = EpochId(h[12]); + for i in 27..=31 { + let height = genesis_height + i as BlockHeight; + tracing::info!(target: "test", height); + record_block(&mut epoch_manager, h[i - 1], h[i], height, vec![]); + let tip = Tip { + height, + last_block_hash: h[i], + prev_block_hash: h[i - 1], + epoch_id: epoch3.clone(), + next_epoch_id: epoch4.clone(), + }; + assert_eq!(epoch_manager.possible_epochs_of_height_around_tip(&tip, 0).unwrap(), vec![]); + for h in 0..=11 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![] + ); + } + for h in 12..=26 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![epoch2.clone()] + ); + } + for h in 27..=31 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![epoch3.clone()] + ); + } + for h in 32..40 { + assert_eq!( + epoch_manager + .possible_epochs_of_height_around_tip(&tip, genesis_height + h) + .unwrap(), + vec![epoch3.clone(), epoch4.clone()] + ); + } + } +} diff --git a/chain/network/src/client.rs b/chain/network/src/client.rs index 8859a1e54e4..6319f2a508c 100644 --- a/chain/network/src/client.rs +++ b/chain/network/src/client.rs @@ -117,7 +117,6 @@ pub struct AnnounceAccountRequest(pub Vec<(AnnounceAccount, Option)>); pub struct ChunkStateWitnessMessage { pub witness: ChunkStateWitness, pub peer_id: PeerId, - pub attempts_remaining: usize, } #[derive(actix::Message, Debug, Clone, PartialEq, Eq)] diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 36f81d834c9..f1df8d81104 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -1012,11 +1012,7 @@ impl PeerActor { RoutedMessageBody::ChunkStateWitness(witness) => { network_state .client - .send_async(ChunkStateWitnessMessage { - witness, - peer_id, - attempts_remaining: 5, - }) + .send_async(ChunkStateWitnessMessage { witness, peer_id }) .await .ok(); None diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index b26110128f7..df0af6eae38 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -301,6 +301,11 @@ pub fn default_produce_chunk_add_transactions_time_limit() -> Option { Some(Duration::from_millis(200)) } +pub fn default_orphan_state_witness_pool_size() -> usize { + // With 5 shards, a capacity of 25 witnesses allows to store 5 orphan witnesses per shard. + 25 +} + /// Config for the Chunk Distribution Network feature. /// This allows nodes to push and pull chunks from a central stream. /// The two benefits of this approach are: (1) less request/response traffic @@ -443,6 +448,10 @@ pub struct ClientConfig { /// Nodes not participating will still function fine, but possibly with higher /// latency due to the need of requesting chunks over the peer-to-peer network. pub chunk_distribution_network: Option, + /// OrphanStateWitnessPool keeps instances of ChunkStateWitness which can't be processed + /// because the previous block isn't available. The witnesses wait in the pool untl the + /// required block appears. This variable controls how many witnesses can be stored in the pool. + pub orphan_state_witness_pool_size: usize, } impl ClientConfig { @@ -527,6 +536,7 @@ impl ClientConfig { "produce_chunk_add_transactions_time_limit", ), chunk_distribution_network: None, + orphan_state_witness_pool_size: default_orphan_state_witness_pool_size(), } } } diff --git a/core/chain-configs/src/lib.rs b/core/chain-configs/src/lib.rs index 304edf608d0..8790ac5421b 100644 --- a/core/chain-configs/src/lib.rs +++ b/core/chain-configs/src/lib.rs @@ -10,15 +10,16 @@ pub use client_config::{ default_enable_multiline_logging, default_epoch_sync_enabled, default_header_sync_expected_height_per_second, default_header_sync_initial_timeout, default_header_sync_progress_timeout, default_header_sync_stall_ban_timeout, - default_log_summary_period, default_produce_chunk_add_transactions_time_limit, - default_state_sync, default_state_sync_enabled, default_state_sync_timeout, - default_sync_check_period, default_sync_height_threshold, default_sync_step_period, - default_transaction_pool_size_limit, default_trie_viewer_state_size_limit, - default_tx_routing_height_horizon, default_view_client_threads, - default_view_client_throttle_period, ChunkDistributionNetworkConfig, ChunkDistributionUris, - ClientConfig, DumpConfig, ExternalStorageConfig, ExternalStorageLocation, GCConfig, - LogSummaryStyle, ReshardingConfig, ReshardingHandle, StateSyncConfig, SyncConfig, - DEFAULT_GC_NUM_EPOCHS_TO_KEEP, DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL, + default_log_summary_period, default_orphan_state_witness_pool_size, + default_produce_chunk_add_transactions_time_limit, default_state_sync, + default_state_sync_enabled, default_state_sync_timeout, default_sync_check_period, + default_sync_height_threshold, default_sync_step_period, default_transaction_pool_size_limit, + default_trie_viewer_state_size_limit, default_tx_routing_height_horizon, + default_view_client_threads, default_view_client_throttle_period, + ChunkDistributionNetworkConfig, ChunkDistributionUris, ClientConfig, DumpConfig, + ExternalStorageConfig, ExternalStorageLocation, GCConfig, LogSummaryStyle, ReshardingConfig, + ReshardingHandle, StateSyncConfig, SyncConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP, + DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL, DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_ON_CATCHUP_EXTERNAL, MIN_GC_NUM_EPOCHS_TO_KEEP, TEST_STATE_SYNC_TIMEOUT, }; diff --git a/core/primitives/src/stateless_validation.rs b/core/primitives/src/stateless_validation.rs index bd6f6a23ff3..afdcb31eff1 100644 --- a/core/primitives/src/stateless_validation.rs +++ b/core/primitives/src/stateless_validation.rs @@ -1,13 +1,13 @@ use std::collections::{HashMap, HashSet}; use crate::challenge::PartialState; -use crate::sharding::{ChunkHash, ReceiptProof, ShardChunkHeader}; +use crate::sharding::{ChunkHash, ReceiptProof, ShardChunkHeader, ShardChunkHeaderV3}; use crate::transaction::SignedTransaction; -use crate::validator_signer::ValidatorSigner; +use crate::validator_signer::{EmptyValidatorSigner, ValidatorSigner}; use borsh::{BorshDeserialize, BorshSerialize}; use near_crypto::{PublicKey, Signature}; use near_primitives_core::hash::CryptoHash; -use near_primitives_core::types::{AccountId, Balance}; +use near_primitives_core::types::{AccountId, Balance, BlockHeight, ShardId}; /// An arbitrary static string to make sure that this struct cannot be /// serialized to look identical to another serialized struct. For chunk @@ -118,6 +118,43 @@ impl ChunkStateWitnessInner { } } +impl ChunkStateWitness { + // Make a new dummy ChunkStateWitness for testing. + pub fn new_dummy( + height: BlockHeight, + shard_id: ShardId, + prev_block_hash: CryptoHash, + ) -> ChunkStateWitness { + let header = ShardChunkHeader::V3(ShardChunkHeaderV3::new( + prev_block_hash, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + height, + shard_id, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + Default::default(), + Default::default(), + &EmptyValidatorSigner::default(), + )); + let inner = ChunkStateWitnessInner::new( + header, + Default::default(), + Default::default(), + Default::default(), + Default::default(), + Default::default(), + Default::default(), + Default::default(), + ); + ChunkStateWitness { inner, signature: Signature::default() } + } +} + /// Represents the base state and the expected post-state-root of a chunk's state /// transition. The actual state transition itself is not included here. #[derive(Debug, Default, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] diff --git a/integration-tests/src/tests/client/features.rs b/integration-tests/src/tests/client/features.rs index e4c069e63fd..39860298cac 100644 --- a/integration-tests/src/tests/client/features.rs +++ b/integration-tests/src/tests/client/features.rs @@ -18,6 +18,7 @@ mod lower_storage_key_limit; mod nearvm; #[cfg(feature = "protocol_feature_nonrefundable_transfer_nep491")] mod nonrefundable_transfer; +mod orphan_chunk_state_witness; mod restore_receipts_after_fix_apply_chunks; mod restrict_tla; mod stateless_validation; diff --git a/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs b/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs new file mode 100644 index 00000000000..75faea53f94 --- /dev/null +++ b/integration-tests/src/tests/client/features/orphan_chunk_state_witness.rs @@ -0,0 +1,422 @@ +use std::collections::HashSet; + +use near_chain::{Block, Provenance}; +use near_chain_configs::Genesis; +use near_client::test_utils::TestEnv; +use near_client::{Client, ProcessingDoneTracker, ProcessingDoneWaiter}; +use near_client::{HandleOrphanWitnessOutcome, MAX_ORPHAN_WITNESS_SIZE}; +use near_crypto::Signature; +use near_network::types::{NetworkRequests, PeerManagerMessageRequest}; +use near_o11y::testonly::init_integration_logger; +use near_primitives::merkle::{Direction, MerklePathItem}; +use near_primitives::network::PeerId; +use near_primitives::sharding::{ + ChunkHash, ReceiptProof, ShardChunkHeader, ShardChunkHeaderInner, ShardChunkHeaderInnerV2, + ShardProof, +}; +use near_primitives::stateless_validation::ChunkStateWitness; +use near_primitives_core::checked_feature; +use near_primitives_core::hash::CryptoHash; +use near_primitives_core::types::AccountId; +use near_primitives_core::version::PROTOCOL_VERSION; +use nearcore::config::GenesisExt; +use nearcore::test_utils::TestEnvNightshadeSetupExt; + +struct OrphanWitnessTestEnv { + env: TestEnv, + block1: Block, + block2: Block, + witness: ChunkStateWitness, + excluded_validator: AccountId, + excluded_validator_idx: usize, + chunk_producer: AccountId, +} + +/// This function prepares a scenario in which an orphaned chunk witness will occur. +/// It creates two blocks (`block1` and `block2`), but doesn't pass them to `excluded_validator`. +/// When `excluded_validator` receives a witness for the chunk belonging to `block2`, it doesn't +/// have `block1` which is required to process the witness, so it becomes an orphaned state witness. +fn setup_orphan_witness_test() -> OrphanWitnessTestEnv { + let accounts: Vec = (0..4).map(|i| format!("test{i}").parse().unwrap()).collect(); + let genesis = Genesis::test(accounts.clone(), accounts.len().try_into().unwrap()); + let mut env = TestEnv::builder(&genesis.config) + .clients(accounts.clone()) + .validators(accounts.clone()) + .nightshade_runtimes(&genesis) + .build(); + + // Run the blockchain for a few blocks + for height in 1..4 { + // Produce the next block + let tip = env.clients[0].chain.head().unwrap(); + let block_producer = env.get_block_producer_at_offset(&tip, 1); + tracing::info!(target: "test", "Producing block at height: {height} by {block_producer}"); + let block = env.client(&block_producer).produce_block(tip.height + 1).unwrap().unwrap(); + tracing::info!(target: "test", "Block produced at height {} has chunk {:?}", height, block.chunks()[0].chunk_hash()); + + // The first block after genesis doesn't have any chunks, but all other blocks should have a new chunk inside. + if height > 1 { + assert_eq!( + block.chunks()[0].height_created(), + block.header().height(), + "There should be no missing chunks." + ); + } + + // Pass network messages around + for i in 0..env.clients.len() { + let blocks_processed = + env.clients[i].process_block_test(block.clone().into(), Provenance::NONE).unwrap(); + assert_eq!(blocks_processed, vec![*block.hash()]); + } + + env.process_partial_encoded_chunks(); + for client_idx in 0..env.clients.len() { + env.process_shards_manager_responses_and_finish_processing_blocks(client_idx); + } + env.propagate_chunk_state_witnesses_and_endorsements(false); + + // Verify heads + let heads = env + .clients + .iter() + .map(|client| client.chain.head().unwrap().last_block_hash) + .collect::>(); + assert_eq!(heads.len(), 1, "All clients should have the same head"); + } + + // Produce two more blocks (`block1` and `block2`), but don't send them to the `excluded_validator`. + // The `excluded_validator` will receive a chunk witness for the chunk in `block2`, but it won't + // have `block1`, so it will become an orphaned chunk state witness. + let tip = env.clients[0].chain.head().unwrap(); + + let block1_producer = env.get_block_producer_at_offset(&tip, 1); + let block2_producer = env.get_block_producer_at_offset(&tip, 2); + let block2_chunk_producer = env.get_chunk_producer_at_offset(&tip, 2, 0); + + // The excluded validator shouldn't produce any blocks or chunks in the next two blocks. + // There's 4 validators and at most 3 aren't a good candidate, so there's always at least + // one that fits all the criteria, the `unwrap()` won't fail. + let excluded_validator = accounts + .into_iter() + .filter(|acc| { + acc != &block1_producer && acc != &block2_producer && acc != &block2_chunk_producer + }) + .next() + .unwrap(); + let excluded_validator_idx = env.get_client_index(&excluded_validator); + let clients_without_excluded = + (0..env.clients.len()).filter(|idx| *idx != excluded_validator_idx); + + tracing::info!(target:"test", "Producing block1 at height {}", tip.height + 1); + let block1 = env.client(&block1_producer).produce_block(tip.height + 1).unwrap().unwrap(); + assert_eq!( + block1.chunks()[0].height_created(), + block1.header().height(), + "There should be no missing chunks." + ); + + for client_idx in clients_without_excluded.clone() { + let blocks_processed = env.clients[client_idx] + .process_block_test(block1.clone().into(), Provenance::NONE) + .unwrap(); + assert_eq!(blocks_processed, vec![*block1.hash()]); + } + env.process_partial_encoded_chunks(); + for client_idx in 0..env.clients.len() { + env.process_shards_manager_responses(client_idx); + } + + // At this point chunk producer for the chunk belonging to block2 produces + // the chunk and sends out a witness for it. Let's intercept the witness + // and process it on all validators except for `excluded_validator`. + // The witness isn't processed on `excluded_validator` to give users of + // `setup_orphan_witness_test()` full control over the events. + let mut witness_opt = None; + let network_adapter = + env.network_adapters[env.get_client_index(&block2_chunk_producer)].clone(); + network_adapter.handle_filtered(|request| match request { + PeerManagerMessageRequest::NetworkRequests(NetworkRequests::ChunkStateWitness( + account_ids, + state_witness, + )) => { + let mut witness_processing_done_waiters: Vec = Vec::new(); + for account_id in account_ids.iter().filter(|acc| **acc != excluded_validator) { + let processing_done_tracker = ProcessingDoneTracker::new(); + witness_processing_done_waiters.push(processing_done_tracker.make_waiter()); + env.client(account_id) + .process_chunk_state_witness( + state_witness.clone(), + PeerId::random(), + Some(processing_done_tracker), + ) + .unwrap(); + } + for waiter in witness_processing_done_waiters { + waiter.wait(); + } + witness_opt = Some(state_witness); + None + } + _ => Some(request), + }); + let witness = witness_opt.unwrap(); + + env.propagate_chunk_endorsements(false); + + tracing::info!(target:"test", "Producing block2 at height {}", tip.height + 2); + let block2 = env.client(&block2_producer).produce_block(tip.height + 2).unwrap().unwrap(); + assert_eq!( + block2.chunks()[0].height_created(), + block2.header().height(), + "There should be no missing chunks." + ); + assert_eq!(witness.inner.chunk_header.chunk_hash(), block2.chunks()[0].chunk_hash()); + + for client_idx in clients_without_excluded { + let blocks_processed = env.clients[client_idx] + .process_block_test(block2.clone().into(), Provenance::NONE) + .unwrap(); + assert_eq!(blocks_processed, vec![*block2.hash()]); + } + + env.process_partial_encoded_chunks(); + for client_idx in 0..env.clients.len() { + env.process_shards_manager_responses_and_finish_processing_blocks(client_idx); + } + + OrphanWitnessTestEnv { + env, + block1, + block2, + witness, + excluded_validator, + excluded_validator_idx, + chunk_producer: block2_chunk_producer, + } +} + +/// Test that a valid orphan witness is correctly processed once the required block arrives. +#[test] +fn test_orphan_witness_valid() { + init_integration_logger(); + + if !checked_feature!("stable", StatelessValidationV0, PROTOCOL_VERSION) { + println!("Test not applicable without StatelessValidation enabled"); + return; + } + + let OrphanWitnessTestEnv { + mut env, + block1, + block2, + witness, + excluded_validator, + excluded_validator_idx, + .. + } = setup_orphan_witness_test(); + + // `excluded_validator` receives witness for chunk belonging to `block2`, but it doesn't have `block1`. + // The witness should become an orphaned witness and it should be saved to the orphan pool. + env.client(&excluded_validator) + .process_chunk_state_witness(witness, PeerId::random(), None) + .unwrap(); + + let block_processed = env + .client(&excluded_validator) + .process_block_test(block1.clone().into(), Provenance::NONE) + .unwrap(); + assert_eq!(block_processed, vec![*block1.hash()]); + + // After processing `block1`, `excluded_validator` should process the orphaned witness for the chunk belonging to `block2` + // and it should send out an endorsement for this chunk. This happens asynchronously, so we have to wait for it. + env.wait_for_chunk_endorsement(excluded_validator_idx, &block2.chunks()[0].chunk_hash()) + .unwrap(); +} + +#[test] +fn test_orphan_witness_bad_signature() { + init_integration_logger(); + + if !checked_feature!("stable", StatelessValidationV0, PROTOCOL_VERSION) { + println!("Test not applicable without StatelessValidation enabled"); + return; + } + + let OrphanWitnessTestEnv { mut env, mut witness, excluded_validator, .. } = + setup_orphan_witness_test(); + + // Modify the witness to contain an invalid signature + witness.signature = Signature::default(); + + let error = env + .client(&excluded_validator) + .process_chunk_state_witness(witness, PeerId::random(), None) + .unwrap_err(); + let error_message = format!("{error}").to_lowercase(); + tracing::info!(target:"test", "Error message: {}", error_message); + assert!(error_message.contains("invalid signature")); +} + +#[test] +fn test_orphan_witness_signature_from_wrong_peer() { + init_integration_logger(); + + if !checked_feature!("stable", StatelessValidationV0, PROTOCOL_VERSION) { + println!("Test not applicable without StatelessValidation enabled"); + return; + } + + let OrphanWitnessTestEnv { mut env, mut witness, excluded_validator, .. } = + setup_orphan_witness_test(); + + // Sign the witness using another validator's key. + // Only witnesses from the chunk producer that produced this witness should be accepted. + resign_witness(&mut witness, env.client(&excluded_validator)); + + let error = env + .client(&excluded_validator) + .process_chunk_state_witness(witness, PeerId::random(), None) + .unwrap_err(); + let error_message = format!("{error}").to_lowercase(); + tracing::info!(target:"test", "Error message: {}", error_message); + assert!(error_message.contains("invalid signature")); +} + +#[test] +fn test_orphan_witness_invalid_shard_id() { + init_integration_logger(); + + if !checked_feature!("stable", StatelessValidationV0, PROTOCOL_VERSION) { + println!("Test not applicable without StatelessValidation enabled"); + return; + } + + let OrphanWitnessTestEnv { mut env, mut witness, excluded_validator, chunk_producer, .. } = + setup_orphan_witness_test(); + + // Set invalid shard_id in the witness header + modify_witness_header_inner(&mut witness, |header| header.shard_id = 10000000); + resign_witness(&mut witness, env.client(&chunk_producer)); + + // The witness should be rejected + let error = env + .client(&excluded_validator) + .process_chunk_state_witness(witness, PeerId::random(), None) + .unwrap_err(); + let error_message = format!("{error}").to_lowercase(); + tracing::info!(target:"test", "Error message: {}", error_message); + assert!(error_message.contains("shard")); +} + +#[test] +fn test_orphan_witness_too_large() { + init_integration_logger(); + + if !checked_feature!("stable", StatelessValidationV0, PROTOCOL_VERSION) { + println!("Test not applicable without StatelessValidation enabled"); + return; + } + + let OrphanWitnessTestEnv { mut env, mut witness, excluded_validator, chunk_producer, .. } = + setup_orphan_witness_test(); + + // Modify the witness to be larger than the allowed limit + let dummy_merkle_path_item = + MerklePathItem { hash: CryptoHash::default(), direction: Direction::Left }; + let max_size_usize: usize = MAX_ORPHAN_WITNESS_SIZE.as_u64().try_into().unwrap(); + let items_count = max_size_usize / std::mem::size_of::() + 1; + let big_path = vec![dummy_merkle_path_item; items_count]; + let big_receipt_proof = + ReceiptProof(Vec::new(), ShardProof { from_shard_id: 0, to_shard_id: 0, proof: big_path }); + witness.inner.source_receipt_proofs.insert(ChunkHash::default(), big_receipt_proof); + resign_witness(&mut witness, env.client(&chunk_producer)); + + // The witness should not be saved too the pool, as it's too big + let outcome = env.client(&excluded_validator).handle_orphan_state_witness(witness).unwrap(); + assert!(matches!(outcome, HandleOrphanWitnessOutcome::TooBig(_))) +} + +/// Witnesses which are too far from the chain head should not be saved to the orphan pool +#[test] +fn test_orphan_witness_far_from_head() { + init_integration_logger(); + + if !checked_feature!("stable", StatelessValidationV0, PROTOCOL_VERSION) { + println!("Test not applicable without StatelessValidation enabled"); + return; + } + + let OrphanWitnessTestEnv { + mut env, + mut witness, + chunk_producer, + block1, + excluded_validator, + .. + } = setup_orphan_witness_test(); + + let bad_height = 10000; + modify_witness_header_inner(&mut witness, |header| header.height_created = bad_height); + resign_witness(&mut witness, env.client(&chunk_producer)); + + let outcome = env.client(&excluded_validator).handle_orphan_state_witness(witness).unwrap(); + assert_eq!( + outcome, + HandleOrphanWitnessOutcome::TooFarFromHead { + witness_height: bad_height, + head_height: block1.header().height() - 1 + } + ); +} + +// Test that orphan witnesses are only partially validated - an orphan witness with invalid +// `source_receipt_proofs` will be accepted and saved into the pool, as there's no way to validate +// this field without the previous block. +#[test] +fn test_orphan_witness_not_fully_validated() { + init_integration_logger(); + + if !checked_feature!("stable", StatelessValidationV0, PROTOCOL_VERSION) { + println!("Test not applicable without StatelessValidation enabled"); + return; + } + + let OrphanWitnessTestEnv { mut env, mut witness, chunk_producer, excluded_validator, .. } = + setup_orphan_witness_test(); + + // Make the witness invalid in a way that won't be detected during orphan witness validation + witness.inner.source_receipt_proofs.insert( + ChunkHash::default(), + ReceiptProof( + vec![], + ShardProof { from_shard_id: 100230230, to_shard_id: 383939, proof: vec![] }, + ), + ); + resign_witness(&mut witness, env.client(&chunk_producer)); + + // The witness should be accepted and saved into the pool, even though it's invalid. + // There is no way to fully validate an orphan witness, so this is the correct behavior. + // The witness will later be fully validated when the required block arrives. + env.client(&excluded_validator) + .process_chunk_state_witness(witness, PeerId::random(), None) + .unwrap(); +} + +fn modify_witness_header_inner( + witness: &mut ChunkStateWitness, + f: impl FnOnce(&mut ShardChunkHeaderInnerV2), +) { + match &mut witness.inner.chunk_header { + ShardChunkHeader::V3(header) => match &mut header.inner { + ShardChunkHeaderInner::V2(header_inner) => f(header_inner), + _ => panic!(), + }, + _ => panic!(), + }; +} + +fn resign_witness(witness: &mut ChunkStateWitness, signer: &Client) { + witness.signature = + signer.validator_signer.as_ref().unwrap().sign_chunk_state_witness(&witness.inner).0; +} diff --git a/integration-tests/src/tests/client/features/stateless_validation.rs b/integration-tests/src/tests/client/features/stateless_validation.rs index 2946c19b0bb..09913cbe60d 100644 --- a/integration-tests/src/tests/client/features/stateless_validation.rs +++ b/integration-tests/src/tests/client/features/stateless_validation.rs @@ -1,8 +1,6 @@ use near_epoch_manager::{EpochManager, EpochManagerAdapter}; use near_primitives::network::PeerId; -use near_primitives::sharding::{ShardChunkHeader, ShardChunkHeaderV3}; -use near_primitives::stateless_validation::{ChunkStateWitness, ChunkStateWitnessInner}; -use near_primitives::validator_signer::EmptyValidatorSigner; +use near_primitives::stateless_validation::ChunkStateWitness; use near_store::test_utils::create_test_store; use nearcore::config::GenesisExt; use rand::rngs::StdRng; @@ -14,7 +12,6 @@ use near_chain_configs::{Genesis, GenesisConfig, GenesisRecords}; use near_client::test_utils::TestEnv; use near_crypto::{InMemorySigner, KeyType}; use near_o11y::testonly::init_integration_logger; -use near_primitives::block::Tip; use near_primitives::epoch_manager::AllEpochConfigTestOverrides; use near_primitives::num_rational::Rational32; use near_primitives::shard_layout::ShardLayout; @@ -169,7 +166,7 @@ fn run_chunk_validation_test(seed: u64, prob_missing_chunk: f64) { let _ = env.clients[0].process_tx(tx, false, false); } - let block_producer = get_block_producer(&env, &tip, 1); + let block_producer = env.get_block_producer_at_offset(&tip, 1); tracing::debug!( target: "stateless_validation", "Producing block at height {} by {}", tip.height + 1, block_producer @@ -243,17 +240,6 @@ fn test_chunk_validation_high_missing_chunks() { run_chunk_validation_test(44, 0.81); } -/// Returns the block producer for the height of head + height_offset. -fn get_block_producer(env: &TestEnv, head: &Tip, height_offset: u64) -> AccountId { - let client = &env.clients[0]; - let epoch_manager = &client.epoch_manager; - let parent_hash = &head.last_block_hash; - let epoch_id = epoch_manager.get_epoch_id_from_prev_block(parent_hash).unwrap(); - let height = head.height + height_offset; - let block_producer = epoch_manager.get_block_producer(&epoch_id, height).unwrap(); - block_producer -} - #[test] fn test_protocol_upgrade_81() { init_integration_logger(); @@ -346,36 +332,7 @@ fn test_chunk_state_witness_bad_shard_id() { // Create a dummy ChunkStateWitness with an invalid shard_id let previous_block = env.clients[0].chain.head().unwrap().prev_block_hash; let invalid_shard_id = 1000000000; - - let chunk_header = ShardChunkHeader::V3(ShardChunkHeaderV3::new( - previous_block, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - upper_height, - invalid_shard_id, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - Default::default(), - Default::default(), - &EmptyValidatorSigner::default(), - )); - let witness = ChunkStateWitness { - inner: ChunkStateWitnessInner::new( - chunk_header, - Default::default(), - Default::default(), - Default::default(), - Default::default(), - Default::default(), - Default::default(), - Default::default(), - ), - signature: Default::default(), - }; + let witness = ChunkStateWitness::new_dummy(upper_height, invalid_shard_id, previous_block); // Client should reject this ChunkStateWitness and the error message should mention "shard" tracing::info!(target: "test", "Processing invalid ChunkStateWitness"); diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index 1137f3f3e5a..7b525b5b46a 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -5,14 +5,14 @@ use near_chain_configs::{ default_enable_multiline_logging, default_epoch_sync_enabled, default_header_sync_expected_height_per_second, default_header_sync_initial_timeout, default_header_sync_progress_timeout, default_header_sync_stall_ban_timeout, - default_log_summary_period, default_produce_chunk_add_transactions_time_limit, - default_state_sync, default_state_sync_enabled, default_state_sync_timeout, - default_sync_check_period, default_sync_height_threshold, default_sync_step_period, - default_transaction_pool_size_limit, default_trie_viewer_state_size_limit, - default_tx_routing_height_horizon, default_view_client_threads, - default_view_client_throttle_period, get_initial_supply, ChunkDistributionNetworkConfig, - ClientConfig, GCConfig, Genesis, GenesisConfig, GenesisValidationMode, LogSummaryStyle, - MutableConfigValue, ReshardingConfig, StateSyncConfig, + default_log_summary_period, default_orphan_state_witness_pool_size, + default_produce_chunk_add_transactions_time_limit, default_state_sync, + default_state_sync_enabled, default_state_sync_timeout, default_sync_check_period, + default_sync_height_threshold, default_sync_step_period, default_transaction_pool_size_limit, + default_trie_viewer_state_size_limit, default_tx_routing_height_horizon, + default_view_client_threads, default_view_client_throttle_period, get_initial_supply, + ChunkDistributionNetworkConfig, ClientConfig, GCConfig, Genesis, GenesisConfig, + GenesisValidationMode, LogSummaryStyle, MutableConfigValue, ReshardingConfig, StateSyncConfig, }; use near_config_utils::{ValidationError, ValidationErrors}; use near_crypto::{InMemorySigner, KeyFile, KeyType, PublicKey, Signer}; @@ -322,6 +322,10 @@ pub struct Config { /// latency due to the need of requesting chunks over the peer-to-peer network. #[serde(skip_serializing_if = "Option::is_none")] pub chunk_distribution_network: Option, + /// OrphanStateWitnessPool keeps instances of ChunkStateWitness which can't be processed + /// because the previous block isn't available. The witnesses wait in the pool untl the + /// required block appears. This variable controls how many witnesses can be stored in the pool. + pub orphan_state_witness_pool_size: usize, } fn is_false(value: &bool) -> bool { @@ -367,6 +371,7 @@ impl Default for Config { produce_chunk_add_transactions_time_limit: default_produce_chunk_add_transactions_time_limit(), chunk_distribution_network: None, + orphan_state_witness_pool_size: default_orphan_state_witness_pool_size(), } } } @@ -681,6 +686,7 @@ impl NearConfig { "produce_chunk_add_transactions_time_limit", ), chunk_distribution_network: config.chunk_distribution_network, + orphan_state_witness_pool_size: config.orphan_state_witness_pool_size, }, network_config: NetworkConfig::new( config.network,