diff --git a/chain/chain/src/block_processing_utils.rs b/chain/chain/src/block_processing_utils.rs index a21b332f48d..5b4a3ab37e2 100644 --- a/chain/chain/src/block_processing_utils.rs +++ b/chain/chain/src/block_processing_utils.rs @@ -17,6 +17,17 @@ pub(crate) const MAX_PROCESSING_BLOCKS: usize = 5; /// Contains information from preprocessing a block pub(crate) struct BlockPreprocessInfo { + /// This field has two related but actually different meanings. For the first block of an + /// epoch, this will be set to false if we need to download state for shards we'll track in + /// the future but don't track currently. This implies the first meaning, which is that if + /// this is true, then we are ready to apply all chunks and update flat state for shards + /// we'll track in this and the next epoch. This comes into play when we decide what ApplyChunksMode + /// to pass to Chain::apply_chunks_preprocessing(). + /// The other meaning is that the catchup code should process this block. When the state sync sync_hash + /// is the first block of the epoch, these two meanings are the same. But if the sync_hash is moved forward + /// in order to sync the current epoch's state instead of last epoch's, this field being false no longer implies + /// that we want to apply this block during catchup, so some care is needed to ensure we start catchup at the right + /// point in Client::run_catchup() pub(crate) is_caught_up: bool, pub(crate) state_sync_info: Option, pub(crate) incoming_receipts: HashMap>, diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 5ca86cf3d8b..489c26c0b9d 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -65,7 +65,7 @@ use near_primitives::sandbox::state_patch::SandboxStatePatch; use near_primitives::shard_layout::{account_id_to_shard_id, ShardLayout, ShardUId}; use near_primitives::sharding::{ ChunkHash, ChunkHashHeight, EncodedShardChunk, ReceiptList, ReceiptProof, ShardChunk, - ShardChunkHeader, ShardInfo, ShardProof, StateSyncInfo, + ShardChunkHeader, ShardProof, StateSyncInfo, }; use near_primitives::state_part::PartId; use near_primitives::state_sync::{ @@ -306,6 +306,14 @@ pub enum VerifyBlockHashAndSignatureResult { CannotVerifyBecauseBlockIsOrphan, } +/// returned by should_make_or_delete_snapshot(), this type tells what we should do to the state snapshot +enum SnapshotAction { + /// Make a new snapshot. Contains the prev_hash of the sync_hash that is used for state sync + MakeSnapshot(CryptoHash), + DeleteSnapshot, + None, +} + impl Chain { /// Builds genesis block and chunks from the current configuration obtained through the arguments. pub fn make_genesis_block( @@ -768,9 +776,9 @@ impl Chain { fn get_state_sync_info( &self, me: &Option, - block: &Block, + epoch_first_block: &Block, ) -> Result, Error> { - let prev_hash = *block.header().prev_hash(); + let prev_hash = *epoch_first_block.header().prev_hash(); let shards_to_state_sync = Chain::get_shards_to_state_sync( self.epoch_manager.as_ref(), &self.shard_tracker, @@ -779,7 +787,9 @@ impl Chain { )?; let prev_block = self.get_block(&prev_hash)?; - if prev_block.chunks().len() != block.chunks().len() && !shards_to_state_sync.is_empty() { + if prev_block.chunks().len() != epoch_first_block.chunks().len() + && !shards_to_state_sync.is_empty() + { // Currently, the state sync algorithm assumes that the number of chunks do not change // between the epoch being synced to and the last epoch. // For example, if shard layout changes at the beginning of epoch T, validators @@ -791,7 +801,7 @@ impl Chain { // the validator will not have the states ready so it will halt. error!( "Cannot download states for epoch {:?} because sharding just changed. I'm {:?}", - block.header().epoch_id(), + epoch_first_block.header().epoch_id(), me ); debug_assert!(false); @@ -800,21 +810,19 @@ impl Chain { Ok(None) } else { debug!(target: "chain", "Downloading state for {:?}, I'm {:?}", shards_to_state_sync, me); - let epoch_id = block.header().epoch_id(); + let epoch_id = epoch_first_block.header().epoch_id(); let shard_layout = self.epoch_manager.get_shard_layout(&epoch_id)?; - let state_sync_info = StateSyncInfo { - epoch_tail_hash: *block.header().hash(), - shards: shards_to_state_sync - .iter() - .map(|shard_id| { - let shard_index = shard_layout.get_shard_index(*shard_id); - let chunk = &prev_block.chunks()[shard_index]; - ShardInfo(*shard_id, chunk.chunk_hash()) - }) - .collect(), - }; - + let protocol_version = self.epoch_manager.get_epoch_protocol_version(epoch_id)?; + // Note that this block is the first block in an epoch because this function is only called + // in get_catchup_and_state_sync_infos() when that is the case. + let state_sync_info = StateSyncInfo::new( + protocol_version, + *epoch_first_block.header().hash(), + &prev_block, + &shard_layout, + &shards_to_state_sync, + ); Ok(Some(state_sync_info)) } } @@ -1691,6 +1699,7 @@ impl Chain { let prev_hash = *header.prev_hash(); let prev_block = self.get_block(&prev_hash)?; + // TODO(current_epoch_state_sync): fix this when syncing to the current epoch's state // The congestion control added a dependency on the prev block when // applying chunks in a block. This means that we need to keep the // blocks at sync hash, prev hash and prev prev hash. @@ -2507,6 +2516,84 @@ impl Chain { ) } + // TODO(current_epoch_state_sync): move state sync related code to state sync files + /// Find the hash that should be used as the reference point when requesting state sync + /// headers and parts from other nodes for the epoch the block with hash `block_hash` belongs to. + /// If syncing to the state of that epoch (the new way), this block hash might not yet be known, + /// in which case this returns None. If syncing to the state of the previous epoch (the old way), + /// it's the hash of the first block in that epoch. + pub fn get_sync_hash(&self, block_hash: &CryptoHash) -> Result, Error> { + if block_hash == self.genesis().hash() { + // We shouldn't be trying to sync state from before the genesis block + return Ok(None); + } + let header = self.get_block_header(block_hash)?; + let protocol_version = self.epoch_manager.get_epoch_protocol_version(header.epoch_id())?; + if ProtocolFeature::StateSyncHashUpdate.enabled(protocol_version) { + self.get_current_epoch_sync_hash(block_hash) + } else { + // In the first epoch, it doesn't make sense to sync state to the previous epoch. + if header.epoch_id() == &EpochId::default() { + return Ok(None); + } + self.get_previous_epoch_sync_hash(block_hash).map(Some) + } + } + + /// Find the hash of the first block in the epoch the block with hash `block_hash` belongs to. + /// This is the "sync_hash" that will be used as a reference point when state syncing the previous epoch's state + fn get_previous_epoch_sync_hash(&self, block_hash: &CryptoHash) -> Result { + Ok(*self.epoch_manager.get_block_info(block_hash)?.epoch_first_block()) + } + + /// Returns the first block for which at least two new chunks have been produced for every shard in the epoch + /// This is the "sync_hash" that will be used as a reference point when state syncing the current epoch's state + // TODO(current_epoch_state_sync): remove this and replace it with a single more efficient function that somehow + // keeps track of the number of new chunks per shard so for, like in `NewChunkTracker::find_sync_hash()`, and then + // only implement this in one place. + fn get_current_epoch_sync_hash( + &self, + block_hash: &CryptoHash, + ) -> Result, Error> { + let epoch_start = self.get_previous_epoch_sync_hash(block_hash)?; + let mut header = self.get_block_header(&epoch_start)?; + + let shard_layout = self.epoch_manager.get_shard_layout(header.epoch_id())?; + let shard_ids = self.epoch_manager.shard_ids(header.epoch_id())?; + let mut num_new_chunks: HashMap<_, _> = + shard_ids.iter().map(|shard_id| (*shard_id, 0)).collect(); + + loop { + let next_hash = match self.chain_store().get_next_block_hash(header.hash()) { + Ok(h) => h, + Err(Error::DBNotFoundErr(_)) => return Ok(None), + Err(e) => return Err(e), + }; + header = self.get_block_header(&next_hash)?; + + let mut done = true; + for (shard_id, num_new_chunks) in num_new_chunks.iter_mut() { + let shard_index = shard_layout.get_shard_index(*shard_id); + let Some(included) = header.chunk_mask().get(shard_index) else { + return Err(Error::Other(format!( + "can't get shard {} in chunk mask for block {}", + shard_id, + header.hash() + ))); + }; + if *included { + *num_new_chunks += 1; + } + if *num_new_chunks < 2 { + done = false; + } + } + if done { + return Ok(Some(*header.hash())); + } + } + } + /// Computes ShardStateSyncResponseHeader. pub fn compute_state_response_header( &self, @@ -2532,10 +2619,9 @@ impl Chain { // The chunk was applied at height `chunk_header.height_included`. // Getting the `current` state. + // TODO(current_epoch_state_sync): check that the sync block is what we would expect. So, either the first + // block of an epoch, or the first block where there have been two new chunks in the epoch let sync_prev_block = self.get_block(sync_block_header.prev_hash())?; - if sync_block_epoch_id == sync_prev_block.header().epoch_id() { - return Err(sync_hash_not_first_hash(sync_hash)); - } let shard_layout = self.epoch_manager.get_shard_layout(&sync_block_epoch_id)?; let prev_epoch_id = sync_prev_block.header().epoch_id(); @@ -2746,9 +2832,6 @@ impl Chain { return Err(shard_id_out_of_bounds(shard_id)); } let prev_block = self.get_block(header.prev_hash())?; - if epoch_id == prev_block.header().epoch_id() { - return Err(sync_hash_not_first_hash(sync_hash)); - } let shard_index = shard_layout.get_shard_index(shard_id); let state_root = prev_block .chunks() @@ -3294,6 +3377,9 @@ impl Chain { &mut self, me: &Option, epoch_first_block: &CryptoHash, + // TODO(current_epoch_state_sync): remove the ones not in affected_blocks by breadth first searching from `epoch_first_block` and adding + // descendant blocks to the search when they're not equal to this hash, and then removing everything we see in that search + _catchup_start_block: &CryptoHash, block_processing_artifacts: &mut BlockProcessingArtifact, apply_chunks_done_sender: Option>, affected_blocks: &[CryptoHash], @@ -3846,25 +3932,29 @@ impl Chain { } /// Function to create or delete a snapshot if necessary. + /// TODO: this function calls head() inside of start_process_block_impl(), consider moving this to be called right after HEAD gets updated fn process_snapshot(&mut self) -> Result<(), Error> { - let (make_snapshot, delete_snapshot) = self.should_make_or_delete_snapshot()?; - if !make_snapshot && !delete_snapshot { - return Ok(()); - } + let snapshot_action = self.should_make_or_delete_snapshot()?; let Some(snapshot_callbacks) = &self.snapshot_callbacks else { return Ok(()) }; - if make_snapshot { - let head = self.head()?; - let prev_hash = head.prev_block_hash; - let epoch_height = self.epoch_manager.get_epoch_height_from_prev_block(&prev_hash)?; - let shard_layout = &self.epoch_manager.get_shard_layout_from_prev_block(&prev_hash)?; - let shard_uids = shard_layout.shard_uids().enumerate().collect(); - let last_block = self.get_block(&head.last_block_hash)?; - let make_snapshot_callback = &snapshot_callbacks.make_snapshot_callback; - make_snapshot_callback(prev_hash, epoch_height, shard_uids, last_block); - } else if delete_snapshot { - let delete_snapshot_callback = &snapshot_callbacks.delete_snapshot_callback; - delete_snapshot_callback(); - } + match snapshot_action { + SnapshotAction::MakeSnapshot(prev_hash) => { + let prev_block = self.get_block(&prev_hash)?; + let prev_prev_hash = prev_block.header().prev_hash(); + let epoch_height = + self.epoch_manager.get_epoch_height_from_prev_block(prev_prev_hash)?; + let shard_layout = + &self.epoch_manager.get_shard_layout_from_prev_block(prev_prev_hash)?; + let shard_uids = shard_layout.shard_uids().enumerate().collect(); + + let make_snapshot_callback = &snapshot_callbacks.make_snapshot_callback; + make_snapshot_callback(*prev_prev_hash, epoch_height, shard_uids, prev_block); + } + SnapshotAction::DeleteSnapshot => { + let delete_snapshot_callback = &snapshot_callbacks.delete_snapshot_callback; + delete_snapshot_callback(); + } + SnapshotAction::None => {} + }; Ok(()) } @@ -3894,32 +3984,64 @@ impl Chain { /// Function to check whether we need to create a new snapshot while processing the current block /// Note that this functions is called as a part of block preprocesing, so the head is not updated to current block - fn should_make_or_delete_snapshot(&mut self) -> Result<(bool, bool), Error> { + fn should_make_or_delete_snapshot(&mut self) -> Result { // head value is that of the previous block, i.e. curr_block.prev_hash let head = self.head()?; if head.prev_block_hash == CryptoHash::default() { // genesis block, do not snapshot - return Ok((false, false)); + return Ok(SnapshotAction::None); } let is_epoch_boundary = self.epoch_manager.is_next_block_epoch_start(&head.last_block_hash)?; let will_shard_layout_change = self.epoch_manager.will_shard_layout_change(&head.last_block_hash)?; + let protocol_version = self.epoch_manager.get_epoch_protocol_version(&head.epoch_id)?; + let tries = self.runtime_adapter.get_tries(); let snapshot_config = tries.state_snapshot_config(); - let make_snapshot = match snapshot_config.state_snapshot_type { - // For every epoch, we snapshot if the next block would be in a different epoch - StateSnapshotType::EveryEpoch => is_epoch_boundary, + match snapshot_config.state_snapshot_type { + // For every epoch, we snapshot if the next block is the state sync "sync_hash" block + StateSnapshotType::EveryEpoch => { + if !ProtocolFeature::StateSyncHashUpdate.enabled(protocol_version) { + if is_epoch_boundary { + // Here we return head.last_block_hash as the prev_hash of the first block of the next epoch + Ok(SnapshotAction::MakeSnapshot(head.last_block_hash)) + } else { + Ok(SnapshotAction::None) + } + } else { + // TODO(current_epoch_state_sync): this needs to be fixed. can't be iterating over the whole chain inside of preprocess + // block like that if there are many missed chunks + let Some(sync_hash) = + self.get_current_epoch_sync_hash(&head.last_block_hash)? + else { + return Ok(SnapshotAction::None); + }; + if sync_hash == head.last_block_hash { + // note that here we're returning prev_block_hash instead of last_block_hash because in this case + // we can't detect the right sync hash until it is actually applied as the head block + Ok(SnapshotAction::MakeSnapshot(head.prev_block_hash)) + } else { + Ok(SnapshotAction::None) + } + } + } // For resharding only, we snapshot if next block would be in a different shard layout - StateSnapshotType::ForReshardingOnly => is_epoch_boundary && will_shard_layout_change, - }; - - // We need to delete the existing snapshot at the epoch boundary if we are not making a new snapshot - // This is useful for the next epoch after resharding where make_snapshot is false but it's an epoch boundary - let delete_snapshot = !make_snapshot && is_epoch_boundary; - - Ok((make_snapshot, delete_snapshot)) + StateSnapshotType::ForReshardingOnly => { + if is_epoch_boundary { + if will_shard_layout_change { + Ok(SnapshotAction::MakeSnapshot(head.last_block_hash)) + } else { + // We need to delete the existing snapshot at the epoch boundary if we are not making a new snapshot + // This is useful for the next epoch after resharding where we don't make a snapshot but it's an epoch boundary + Ok(SnapshotAction::DeleteSnapshot) + } + } else { + Ok(SnapshotAction::None) + } + } + } } } @@ -4002,12 +4124,6 @@ fn shard_id_out_of_bounds(shard_id: ShardId) -> Error { Error::InvalidStateRequest(format!("shard_id {shard_id:?} out of bounds").into()) } -fn sync_hash_not_first_hash(sync_hash: CryptoHash) -> Error { - Error::InvalidStateRequest( - format!("sync_hash {sync_hash:?} is not the first hash of the epoch").into(), - ) -} - /// We want to guarantee that transactions are only applied once for each shard, /// even though apply_chunks may be called twice, once with /// ApplyChunksMode::NotCaughtUp once with ApplyChunksMode::CatchingUp. Note @@ -4281,24 +4397,16 @@ impl Chain { self.invalid_blocks.contains(hash) } - /// Check that sync_hash is the first block of an epoch. + /// Check that sync_hash matches the one we expect for the epoch containing that block. pub fn check_sync_hash_validity(&self, sync_hash: &CryptoHash) -> Result { // It's important to check that Block exists because we will sync with it. // Do not replace with `get_block_header()`. - let sync_block = self.get_block(sync_hash)?; - let prev_hash = *sync_block.header().prev_hash(); - let is_first_block_of_epoch = self.epoch_manager.is_next_block_epoch_start(&prev_hash); - tracing::debug!( - target: "chain", - ?sync_hash, - ?prev_hash, - sync_hash_epoch_id = ?sync_block.header().epoch_id(), - sync_hash_next_epoch_id = ?sync_block.header().next_epoch_id(), - ?is_first_block_of_epoch, - "check_sync_hash_validity"); + let _sync_block = self.get_block(sync_hash)?; - // If sync_hash is not on the Epoch boundary, it's malicious behavior - Ok(is_first_block_of_epoch?) + // TODO(current_epoch_state_sync): replace this with a more efficient lookup. In the case + // we're syncing to the current epoch, this iterates over blocks in the epoch + let good_sync_hash = self.get_sync_hash(sync_hash)?; + Ok(good_sync_hash.as_ref() == Some(sync_hash)) } /// Get transaction result for given hash of transaction or receipt id @@ -4512,7 +4620,9 @@ pub struct ChunkStateWitnessMessage { } /// Helper to track blocks catch up -/// Lifetime of a block_hash is as follows: +/// Starting from the first block we want to apply after syncing state (so either the first block +/// of an epoch, or a couple blocks after that, if syncing the current epoch's state) the lifetime +/// of a block_hash is as follows: /// 1. It is added to pending blocks, either as first block of an epoch or because we (post) /// processed previous block /// 2. Block is preprocessed and scheduled for processing in sync jobs actor. Block hash @@ -4523,7 +4633,7 @@ pub struct ChunkStateWitnessMessage { /// Otherwise results are committed, block is moved to done blocks and any blocks that /// have this block as previous are added to pending pub struct BlocksCatchUpState { - /// Hash of first block of an epoch + /// Hash of the block where catchup will start from pub first_block_hash: CryptoHash, /// Epoch id pub epoch_id: EpochId, diff --git a/chain/chain/src/garbage_collection.rs b/chain/chain/src/garbage_collection.rs index 79647667313..58dc584b615 100644 --- a/chain/chain/src/garbage_collection.rs +++ b/chain/chain/src/garbage_collection.rs @@ -318,6 +318,7 @@ impl ChainStore { let header = self.get_block_header(&sync_hash)?; let prev_hash = *header.prev_hash(); let sync_height = header.height(); + // TODO(current_epoch_state_sync): fix this when syncing to the current epoch's state // The congestion control added a dependency on the prev block when // applying chunks in a block. This means that we need to keep the // blocks at sync hash, prev hash and prev prev hash. The heigh of that diff --git a/chain/chain/src/store/mod.rs b/chain/chain/src/store/mod.rs index f99c931dd02..ecf60cf6dd2 100644 --- a/chain/chain/src/store/mod.rs +++ b/chain/chain/src/store/mod.rs @@ -2561,7 +2561,7 @@ impl<'a> ChainStoreUpdate<'a> { for state_sync_info in self.add_state_sync_infos.drain(..) { store_update.set_ser( DBCol::StateDlInfos, - state_sync_info.epoch_tail_hash.as_ref(), + state_sync_info.epoch_first_block().as_ref(), &state_sync_info, )?; } diff --git a/chain/chain/src/store_validator/validate.rs b/chain/chain/src/store_validator/validate.rs index 2f7950ed11e..7c9cf3d45e8 100644 --- a/chain/chain/src/store_validator/validate.rs +++ b/chain/chain/src/store_validator/validate.rs @@ -739,8 +739,8 @@ pub(crate) fn state_sync_info_valid( state_sync_info: &StateSyncInfo, ) -> Result<(), StoreValidatorError> { check_discrepancy!( - state_sync_info.epoch_tail_hash, - *block_hash, + state_sync_info.epoch_first_block(), + block_hash, "Invalid StateSyncInfo stored" ); Ok(()) diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index f3a1bed05d2..5b5704b4981 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -62,7 +62,8 @@ use near_primitives::merkle::{merklize, MerklePath, PartialMerkleTree}; use near_primitives::network::PeerId; use near_primitives::receipt::Receipt; use near_primitives::sharding::{ - EncodedShardChunk, PartialEncodedChunk, ShardChunk, ShardChunkHeader, + EncodedShardChunk, PartialEncodedChunk, ShardChunk, ShardChunkHeader, StateSyncInfo, + StateSyncInfoV1, }; use near_primitives::transaction::SignedTransaction; use near_primitives::types::chunk_extra::ChunkExtra; @@ -104,6 +105,17 @@ pub enum AdvProduceBlocksMode { OnlyValid, } +/// The state associated with downloading state for a shard this node will track in the +/// future but does not currently. +pub struct CatchupState { + /// Manages downloading the state. + pub state_sync: StateSync, + /// Keeps track of state downloads, and gets passed to `state_sync`. + pub sync_status: StateSyncStatus, + /// Manages going back to apply chunks after state has been downloaded. + pub catchup: BlocksCatchUpState, +} + pub struct Client { /// Adversarial controls - should be enabled only to test disruptive /// behaviour on chain. @@ -139,9 +151,12 @@ pub struct Client { /// Approvals for which we do not have the block yet pub pending_approvals: lru::LruCache>, + /// A mapping from an epoch that we know needs to be state synced for some shards + /// to a tracker that will find an appropriate sync_hash for state sync to that epoch + catchup_tracker: HashMap, /// A mapping from a block for which a state sync is underway for the next epoch, and the object /// storing the current status of the state sync and blocks catch up - pub catchup_state_syncs: HashMap, + pub catchup_state_syncs: HashMap, /// Keeps track of information needed to perform the initial Epoch Sync pub epoch_sync: EpochSync, /// Keeps track of syncing headers. @@ -224,6 +239,102 @@ pub struct ProduceChunkResult { pub transactions_storage_proof: Option, } +/// This keeps track of the number of new chunks seen in each shard since the block that was passed to new() +/// This whole thing could be replaced with a much simpler function that just computes the number of new chunks +/// in each shard from scratch every time we call it, but in the unlikely and unfortunate case where a shard +/// hasn't had any chunks for a very long time, it would end up being a nontrivial inefficiency to do that +/// every time run_catchup() is called +pub struct NewChunkTracker { + last_checked_hash: CryptoHash, + last_checked_height: BlockHeight, + num_new_chunks: HashMap, + sync_hash: Option, +} + +impl NewChunkTracker { + fn new( + first_block_hash: CryptoHash, + first_block_height: BlockHeight, + shard_ids: &[ShardId], + ) -> Self { + Self { + last_checked_hash: first_block_hash, + last_checked_height: first_block_height, + num_new_chunks: shard_ids.iter().map(|shard_id| (*shard_id, 0)).collect(), + sync_hash: None, + } + } + + // TODO(current_epoch_sync_hash): refactor this and use the same logic in get_current_epoch_sync_hash(). Ideally + // that function should go away (at least as it is now) in favor of a more efficient approach that we can call on + // every new block application + fn record_new_chunks( + &mut self, + epoch_manager: &dyn EpochManagerAdapter, + header: &BlockHeader, + ) -> Result { + let shard_layout = epoch_manager.get_shard_layout(header.epoch_id())?; + + let mut done = true; + for (shard_id, num_new_chunks) in self.num_new_chunks.iter_mut() { + let shard_index = shard_layout.get_shard_index(*shard_id); + let Some(included) = header.chunk_mask().get(shard_index) else { + return Err(Error::Other(format!( + "can't get shard {} in chunk mask for block {}", + shard_id, + header.hash() + ))); + }; + if *included { + *num_new_chunks += 1; + } + if *num_new_chunks < 2 { + done = false; + } + } + self.last_checked_hash = *header.hash(); + self.last_checked_height = header.height(); + Ok(done) + } + + fn find_sync_hash( + &mut self, + chain: &Chain, + epoch_manager: &dyn EpochManagerAdapter, + ) -> Result, Error> { + if let Some(sync_hash) = self.sync_hash { + return Ok(Some(sync_hash)); + } + + let final_head = chain.final_head()?; + + while self.last_checked_height < final_head.height { + let next_hash = match chain.chain_store().get_next_block_hash(&self.last_checked_hash) { + Ok(h) => h, + Err(near_chain_primitives::Error::DBNotFoundErr(_)) => { + return Err(Error::Other(format!( + "final head is #{} {} but get_next_block_hash(#{} {}) is not found", + final_head.height, + final_head.last_block_hash, + self.last_checked_height, + &self.last_checked_hash + ))); + } + Err(e) => return Err(e.into()), + }; + let next_header = chain.get_block_header(&next_hash)?; + let done = self.record_new_chunks(epoch_manager, &next_header)?; + if done { + // TODO(current_epoch_state_sync): check to make sure the epoch IDs are the same. If there are no new chunks in some shard in the epoch, + // this will be for an epoch ahead of this one + self.sync_hash = Some(next_hash); + break; + } + } + Ok(self.sync_hash) + } +} + impl Client { pub fn new( clock: Clock, @@ -371,6 +482,7 @@ impl Client { pending_approvals: lru::LruCache::new( NonZeroUsize::new(num_block_producer_seats).unwrap(), ), + catchup_tracker: HashMap::new(), catchup_state_syncs: HashMap::new(), epoch_sync, header_sync, @@ -2458,6 +2570,57 @@ impl Client { Ok(false) } + /// Find the sync hash. Most of the time it will already be set in `state_sync_info`. If not, try to find it, + /// and set the corresponding field in `state_sync_info`. + fn get_catchup_sync_hash_v1( + &mut self, + state_sync_info: &mut StateSyncInfoV1, + epoch_first_block: &BlockHeader, + ) -> Result, Error> { + if state_sync_info.sync_hash.is_some() { + return Ok(state_sync_info.sync_hash); + } + + let new_chunk_tracker = match self.catchup_tracker.entry(*epoch_first_block.epoch_id()) { + std::collections::hash_map::Entry::Occupied(e) => e.into_mut(), + std::collections::hash_map::Entry::Vacant(e) => { + let shard_ids = self.epoch_manager.shard_ids(epoch_first_block.epoch_id())?; + e.insert(NewChunkTracker::new( + *epoch_first_block.hash(), + epoch_first_block.height(), + &shard_ids, + )) + } + }; + + if let Some(sync_hash) = + new_chunk_tracker.find_sync_hash(&self.chain, self.epoch_manager.as_ref())? + { + state_sync_info.sync_hash = Some(sync_hash); + let mut update = self.chain.mut_chain_store().store_update(); + // note that iterate_state_sync_infos() collects everything into a Vec, so we're not + // actually writing to the DB while actively iterating this column + update.add_state_sync_info(StateSyncInfo::V1(state_sync_info.clone())); + // TODO: would be nice to be able to propagate context up the call stack so we can just log + // once at the top with all the info. Otherwise this error will look very cryptic + update.commit()?; + } + Ok(state_sync_info.sync_hash) + } + + /// Find the sync hash. If syncing to the old epoch's state, it's always set. If syncing to + /// the current epoch's state, it might not yet be known, in which case we try to find it. + fn get_catchup_sync_hash( + &mut self, + state_sync_info: &mut StateSyncInfo, + epoch_first_block: &BlockHeader, + ) -> Result, Error> { + match state_sync_info { + StateSyncInfo::V0(info) => Ok(Some(info.sync_hash)), + StateSyncInfo::V1(info) => self.get_catchup_sync_hash_v1(info, epoch_first_block), + } + } + /// Walks through all the ongoing state syncs for future epochs and processes them pub fn run_catchup( &mut self, @@ -2469,17 +2632,27 @@ impl Client { let _span = debug_span!(target: "sync", "run_catchup").entered(); let me = signer.as_ref().map(|x| x.validator_id().clone()); - for (sync_hash, state_sync_info) in self.chain.chain_store().iterate_state_sync_infos()? { - assert_eq!(sync_hash, state_sync_info.epoch_tail_hash); + for (epoch_first_block, mut state_sync_info) in + self.chain.chain_store().iterate_state_sync_infos()? + { + assert_eq!(&epoch_first_block, state_sync_info.epoch_first_block()); + let state_sync_timeout = self.config.state_sync_timeout; - let block_header = self.chain.get_block(&sync_hash)?.header().clone(); + let block_header = self.chain.get_block(&epoch_first_block)?.header().clone(); let epoch_id = block_header.epoch_id(); - let (state_sync, status, blocks_catch_up_state) = - self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| { - tracing::debug!(target: "client", ?sync_hash, "inserting new state sync"); - ( - StateSync::new( + let sync_hash = match self.get_catchup_sync_hash(&mut state_sync_info, &block_header)? { + Some(h) => h, + None => continue, + }; + + let CatchupState { state_sync, sync_status: status, catchup } = self + .catchup_state_syncs + .entry(sync_hash) + .or_insert_with(|| { + tracing::debug!(target: "client", ?epoch_first_block, ?sync_hash, "inserting new state sync"); + CatchupState { + state_sync: StateSync::new( self.clock.clone(), self.runtime_adapter.store().clone(), self.epoch_manager.clone(), @@ -2492,21 +2665,20 @@ impl Client { self.state_sync_future_spawner.clone(), true, ), - StateSyncStatus { + sync_status: StateSyncStatus { sync_hash, sync_status: HashMap::new(), download_tasks: Vec::new(), computation_tasks: Vec::new(), }, - BlocksCatchUpState::new(sync_hash, *epoch_id), - ) + catchup: BlocksCatchUpState::new(sync_hash, *epoch_id), + } }); - // For colour decorators to work, they need to printed directly. Otherwise the decorators get escaped, garble output and don't add colours. debug!(target: "catchup", ?me, ?sync_hash, progress_per_shard = ?status.sync_status, "Catchup"); let tracking_shards: Vec = - state_sync_info.shards.iter().map(|tuple| tuple.0).collect(); + state_sync_info.shards().iter().map(|tuple| tuple.0).collect(); // Initialize the new shard sync to contain the shards to split at // first. It will get updated with the shard sync download status @@ -2518,19 +2690,20 @@ impl Client { self.chain.catchup_blocks_step( &me, &sync_hash, - blocks_catch_up_state, + catchup, block_catch_up_task_scheduler, )?; - if blocks_catch_up_state.is_finished() { + if catchup.is_finished() { let mut block_processing_artifacts = BlockProcessingArtifact::default(); self.chain.finish_catchup_blocks( &me, + &epoch_first_block, &sync_hash, &mut block_processing_artifacts, apply_chunks_done_sender.clone(), - &blocks_catch_up_state.done_blocks, + &catchup.done_blocks, )?; self.process_block_processing_artifact(block_processing_artifacts, &signer); @@ -2716,11 +2889,11 @@ impl Client { impl Client { pub fn get_catchup_status(&self) -> Result, near_chain::Error> { let mut ret = vec![]; - for (sync_hash, (_, shard_sync_state, block_catchup_state)) in + for (sync_hash, CatchupState { sync_status, catchup, .. }) in self.catchup_state_syncs.iter() { let sync_block_height = self.chain.get_block_header(sync_hash)?.height(); - let shard_sync_status: HashMap<_, _> = shard_sync_state + let shard_sync_status: HashMap<_, _> = sync_status .sync_status .iter() .map(|(shard_id, state)| (*shard_id, state.to_string())) @@ -2729,7 +2902,7 @@ impl Client { sync_block_hash: *sync_hash, sync_block_height, shard_sync_status, - blocks_to_catchup: self.chain.get_block_catchup_status(block_catchup_state), + blocks_to_catchup: self.chain.get_block_catchup_status(catchup), }); } Ok(ret) diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 3f5ef59fcbd..7516eaf49aa 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -7,7 +7,7 @@ #[cfg(feature = "test_features")] use crate::client::AdvProduceBlocksMode; -use crate::client::{Client, EPOCH_START_INFO_BLOCKS}; +use crate::client::{CatchupState, Client, EPOCH_START_INFO_BLOCKS}; use crate::config_updater::ConfigUpdater; use crate::debug::new_network_info_view; use crate::info::{display_sync_status, InfoHelper}; @@ -15,7 +15,7 @@ use crate::stateless_validation::partial_witness::partial_witness_actor::Partial use crate::sync::state::chain_requests::{ ChainFinalizationRequest, ChainSenderForStateSync, StateHeaderValidationRequest, }; -use crate::sync::state::{get_epoch_start_sync_hash, StateSyncResult}; +use crate::sync::state::StateSyncResult; use crate::sync_jobs_actor::{ClientSenderForSyncJobs, SyncJobsActor}; use crate::{metrics, StatusResponse}; use actix::Actor; @@ -623,7 +623,9 @@ impl Handler for ClientActorInner { } // ... Or one of the catchups - if let Some((state_sync, _, _)) = self.client.catchup_state_syncs.get_mut(&hash) { + if let Some(CatchupState { state_sync, .. }) = + self.client.catchup_state_syncs.get_mut(&hash) + { if let Err(err) = state_sync.apply_peer_message(peer_id, shard_id, hash, state_response) { tracing::error!(?err, "Error applying catchup state sync response"); @@ -1574,21 +1576,22 @@ impl ClientActorInner { /// /// The selected block will always be the first block on a new epoch: /// . - fn find_sync_hash(&mut self) -> Result { + fn find_sync_hash(&self) -> Result, near_chain::Error> { let header_head = self.client.chain.header_head()?; - let sync_hash = header_head.last_block_hash; - let epoch_start_sync_hash = get_epoch_start_sync_hash(&mut self.client.chain, &sync_hash)?; + let sync_hash = match self.client.chain.get_sync_hash(&header_head.last_block_hash)? { + Some(h) => h, + None => return Ok(None), + }; let genesis_hash = self.client.chain.genesis().hash(); tracing::debug!( target: "sync", ?header_head, ?sync_hash, - ?epoch_start_sync_hash, ?genesis_hash, "find_sync_hash"); - assert_ne!(&epoch_start_sync_hash, genesis_hash); - Ok(epoch_start_sync_hash) + assert_ne!(&sync_hash, genesis_hash); + Ok(Some(sync_hash)) } /// Runs catchup on repeat, if this client is a validator. @@ -1729,7 +1732,8 @@ impl ClientActorInner { let sync_hash = match &self.client.sync_status { SyncStatus::StateSync(s) => s.sync_hash, - _ => unreachable!("Sync status should have been StateSync!"), + // sync hash isn't known yet. Return and try again later. + _ => return, }; let me = signer.as_ref().map(|x| x.validator_id().clone()); @@ -1885,7 +1889,11 @@ impl ClientActorInner { return Ok(()); } - let sync_hash = self.find_sync_hash()?; + let sync_hash = if let Some(sync_hash) = self.find_sync_hash()? { + sync_hash + } else { + return Ok(()); + }; if !self.client.config.archive { self.client.chain.mut_chain_store().reset_data_pre_state_sync( sync_hash, @@ -2093,11 +2101,11 @@ impl ClientActorInner { impl Handler for ClientActorInner { fn handle(&mut self, msg: BlockCatchUpResponse) { tracing::debug!(target: "client", ?msg); - if let Some((_, _, blocks_catch_up_state)) = + if let Some(CatchupState { catchup, .. }) = self.client.catchup_state_syncs.get_mut(&msg.sync_hash) { - assert!(blocks_catch_up_state.scheduled_blocks.remove(&msg.block_hash)); - blocks_catch_up_state.processed_blocks.insert( + assert!(catchup.scheduled_blocks.remove(&msg.block_hash)); + catchup.processed_blocks.insert( msg.block_hash, msg.results.into_iter().map(|res| res.1).collect::>(), ); diff --git a/chain/client/src/test_utils/client.rs b/chain/client/src/test_utils/client.rs index af4eb70fbaa..72555c486f1 100644 --- a/chain/client/src/test_utils/client.rs +++ b/chain/client/src/test_utils/client.rs @@ -5,7 +5,7 @@ use std::mem::swap; use std::sync::{Arc, RwLock}; -use crate::client::ProduceChunkResult; +use crate::client::{CatchupState, ProduceChunkResult}; use crate::Client; use actix_rt::System; use itertools::Itertools; @@ -308,11 +308,11 @@ pub fn run_catchup( .into_iter() .map(|res| res.1) .collect_vec(); - if let Some((_, _, blocks_catch_up_state)) = + if let Some(CatchupState { catchup, .. }) = client.catchup_state_syncs.get_mut(&msg.sync_hash) { - assert!(blocks_catch_up_state.scheduled_blocks.remove(&msg.block_hash)); - blocks_catch_up_state.processed_blocks.insert(msg.block_hash, results); + assert!(catchup.scheduled_blocks.remove(&msg.block_hash)); + catchup.processed_blocks.insert(msg.block_hash, results); } else { panic!("block catch up processing result from unknown sync hash"); } diff --git a/chain/client/src/tests/query_client.rs b/chain/client/src/tests/query_client.rs index 12cbd1df494..a2ce1c4f997 100644 --- a/chain/client/src/tests/query_client.rs +++ b/chain/client/src/tests/query_client.rs @@ -1,18 +1,13 @@ -use crate::test_utils::{setup_no_network, setup_only_view}; +use crate::test_utils::setup_no_network; use crate::{ GetBlock, GetBlockWithMerkleTree, GetExecutionOutcomesForBlock, Query, Status, TxStatus, }; use actix::System; use futures::{future, FutureExt}; use near_actix_test_utils::run_actix; -use near_async::messaging::IntoMultiSender; use near_async::time::{Clock, Duration}; -use near_chain::test_utils::ValidatorSchedule; use near_crypto::{InMemorySigner, KeyType}; -use near_network::client::{ - BlockResponse, ProcessTxRequest, ProcessTxResponse, StateRequestHeader, -}; -use near_network::test_utils::MockPeerManagerAdapter; +use near_network::client::{BlockResponse, ProcessTxRequest, ProcessTxResponse}; use near_network::types::PeerInfo; use near_o11y::testonly::init_test_logger; use near_o11y::WithSpanContextExt; @@ -20,7 +15,7 @@ use near_primitives::block::{Block, BlockHeader}; use near_primitives::merkle::PartialMerkleTree; use near_primitives::test_utils::create_test_signer; use near_primitives::transaction::SignedTransaction; -use near_primitives::types::{BlockId, BlockReference, EpochId, ShardId}; +use near_primitives::types::{BlockReference, EpochId, ShardId}; use near_primitives::version::PROTOCOL_VERSION; use near_primitives::views::{QueryRequest, QueryResponseKind}; use num_rational::Ratio; @@ -217,61 +212,3 @@ fn test_execution_outcome_for_chunk() { near_network::test_utils::wait_or_panic(5000); }); } - -#[test] -fn test_state_request() { - run_actix(async { - let vs = - ValidatorSchedule::new().block_producers_per_epoch(vec![vec!["test".parse().unwrap()]]); - let view_client = setup_only_view( - Clock::real(), - vs, - 10000000, - "test".parse().unwrap(), - true, - 200, - 400, - false, - true, - true, - MockPeerManagerAdapter::default().into_multi_sender(), - 100, - ); - actix::spawn(async move { - actix::clock::sleep(std::time::Duration::from_millis(500)).await; - let block_hash = view_client - .send(GetBlock(BlockReference::BlockId(BlockId::Height(0))).with_span_context()) - .await - .unwrap() - .unwrap() - .header - .hash; - for _ in 0..30 { - let res = view_client - .send( - StateRequestHeader { shard_id: ShardId::new(0), sync_hash: block_hash } - .with_span_context(), - ) - .await - .unwrap(); - assert!(res.is_some()); - } - - // immediately query again, should be rejected - let shard_id = ShardId::new(0); - let res = view_client - .send(StateRequestHeader { shard_id, sync_hash: block_hash }.with_span_context()) - .await - .unwrap(); - assert!(res.is_none()); - actix::clock::sleep(std::time::Duration::from_secs(40)).await; - let res = view_client - .send(StateRequestHeader { shard_id, sync_hash: block_hash }.with_span_context()) - .await - .unwrap(); - assert!(res.is_some()); - System::current().stop(); - }); - near_network::test_utils::wait_or_panic(50000); - }); -} diff --git a/core/chain-configs/src/test_genesis.rs b/core/chain-configs/src/test_genesis.rs index e6c6488b823..0c6acb9d8c7 100644 --- a/core/chain-configs/src/test_genesis.rs +++ b/core/chain-configs/src/test_genesis.rs @@ -193,15 +193,18 @@ impl TestGenesisBuilder { pub fn validators_raw( &mut self, validators: Vec, - num_block_and_chunk_producer_seats: NumSeats, + num_block_producer_seats: NumSeats, + num_chunk_producer_seats: NumSeats, num_chunk_validator_only_seats: NumSeats, ) -> &mut Self { + let num_chunk_validator_seats = + std::cmp::max(num_block_producer_seats, num_chunk_producer_seats) + + num_chunk_validator_only_seats; self.validators = Some(ValidatorsSpec::Raw { validators, - num_block_producer_seats: num_block_and_chunk_producer_seats, - num_chunk_producer_seats: num_block_and_chunk_producer_seats, - num_chunk_validator_seats: num_block_and_chunk_producer_seats - + num_chunk_validator_only_seats, + num_block_producer_seats, + num_chunk_producer_seats, + num_chunk_validator_seats, }); self } diff --git a/core/primitives-core/src/version.rs b/core/primitives-core/src/version.rs index e81044ec287..09ac0a9f168 100644 --- a/core/primitives-core/src/version.rs +++ b/core/primitives-core/src/version.rs @@ -178,6 +178,11 @@ pub enum ProtocolFeature { ExcludeContractCodeFromStateWitness, /// A scheduler which limits bandwidth for sending receipts between shards. BandwidthScheduler, + /// Indicates that the "sync_hash" used to identify the point in the chain to sync state to + /// should no longer be the first block of the epoch, but a couple blocks after that in order + /// to sync the current epoch's state. This is not strictly a protocol feature, but is included + /// here to coordinate among nodes + StateSyncHashUpdate, } impl ProtocolFeature { @@ -254,6 +259,7 @@ impl ProtocolFeature { // TODO(#11201): When stabilizing this feature in mainnet, also remove the temporary code // that always enables this for mocknet (see config_mocknet function). ProtocolFeature::ShuffleShardAssignments => 143, + ProtocolFeature::StateSyncHashUpdate => 144, ProtocolFeature::SimpleNightshadeV4 => 145, ProtocolFeature::BandwidthScheduler => 146, diff --git a/core/primitives/src/sharding.rs b/core/primitives/src/sharding.rs index 5ebcc180e9e..7813292e113 100644 --- a/core/primitives/src/sharding.rs +++ b/core/primitives/src/sharding.rs @@ -1,8 +1,10 @@ use crate::bandwidth_scheduler::BandwidthRequests; +use crate::block::Block; use crate::congestion_info::CongestionInfo; use crate::hash::{hash, CryptoHash}; use crate::merkle::{combine_hash, merklize, verify_path, MerklePath}; use crate::receipt::Receipt; +use crate::shard_layout::ShardLayout; use crate::transaction::SignedTransaction; use crate::types::validator_stake::{ValidatorStake, ValidatorStakeIter, ValidatorStakeV1}; use crate::types::{Balance, BlockHeight, Gas, MerkleHash, ShardId, StateRoot}; @@ -58,18 +60,123 @@ impl From for ChunkHash { } } -#[derive(Debug, PartialEq, BorshSerialize, BorshDeserialize)] +#[derive(Clone, Debug, PartialEq, BorshSerialize, BorshDeserialize)] pub struct ShardInfo(pub ShardId, pub ChunkHash); -/// Contains the information that is used to sync state for shards as epochs switch -#[derive(Debug, PartialEq, BorshSerialize, BorshDeserialize)] -pub struct StateSyncInfo { - /// The first block of the epoch for which syncing is happening - pub epoch_tail_hash: CryptoHash, +impl ShardInfo { + fn new(prev_block: &Block, shard_layout: &ShardLayout, shard_id: ShardId) -> Self { + let shard_index = shard_layout.get_shard_index(shard_id); + let chunk = &prev_block.chunks()[shard_index]; + Self(shard_id, chunk.chunk_hash()) + } +} + +/// This version of the type is used in the old state sync, where we sync to the state right before the new epoch +#[derive(Clone, Debug, PartialEq, BorshSerialize, BorshDeserialize)] +pub struct StateSyncInfoV0 { + /// The "sync_hash" block referred to in the state sync algorithm. This is the first block of the + /// epoch we want to state sync for. This field is not strictly required since this struct is keyed + /// by this hash in the database, but it's a small amount of data that makes the info in this type more complete. + pub sync_hash: CryptoHash, + /// Shards to fetch state + pub shards: Vec, +} + +/// This version of the type is used when syncing to the current epoch's state, and `sync_hash` is an +/// Option because it is not known at the beginning of the epoch, but only until a few more blocks are produced. +#[derive(Clone, Debug, PartialEq, BorshSerialize, BorshDeserialize)] +pub struct StateSyncInfoV1 { + /// The first block of the epoch we want to state sync for. This field is not strictly required since + /// this struct is keyed by this hash in the database, but it's a small amount of data that makes + /// the info in this type more complete. + pub epoch_first_block: CryptoHash, + /// The block we'll use as the "sync_hash" when state syncing. Previously, state sync + /// used the first block of an epoch as the "sync_hash", and synced state to the epoch before. + /// Now that state sync downloads the state of the current epoch, we need to wait a few blocks + /// after applying the first block in an epoch to know what "sync_hash" we'll use, so this field + /// is first set to None until we find the right "sync_hash". + pub sync_hash: Option, /// Shards to fetch state pub shards: Vec, } +/// Contains the information that is used to sync state for shards as epochs switch +/// Currently there is only one version possible, but an improvement we might want to make in the future +/// is that when syncing to the current epoch's state, we currently wait for two new chunks in each shard, but +/// with some changes to the meaning of the "sync_hash", we should only need to wait for one. So this is included +/// in order to allow for this change in the future without needing another database migration. +#[derive(Clone, Debug, PartialEq, BorshSerialize, BorshDeserialize)] +pub enum StateSyncInfo { + /// Old state sync: sync to the state right before the new epoch + V0(StateSyncInfoV0), + /// New state sync: sync to the state right after the new epoch + V1(StateSyncInfoV1), +} + +fn shard_infos( + prev_block: &Block, + shard_layout: &ShardLayout, + shards: &[ShardId], +) -> Vec { + shards.iter().map(|shard_id| ShardInfo::new(prev_block, shard_layout, *shard_id)).collect() +} + +impl StateSyncInfo { + pub fn new_previous_epoch( + epoch_first_block: CryptoHash, + prev_block: &Block, + shard_layout: &ShardLayout, + shards: &[ShardId], + ) -> Self { + Self::V0(StateSyncInfoV0 { + sync_hash: epoch_first_block, + shards: shard_infos(prev_block, shard_layout, shards), + }) + } + + fn new_current_epoch( + epoch_first_block: CryptoHash, + prev_block: &Block, + shard_layout: &ShardLayout, + shards: &[ShardId], + ) -> Self { + Self::V1(StateSyncInfoV1 { + epoch_first_block, + sync_hash: None, + shards: shard_infos(prev_block, shard_layout, shards), + }) + } + + pub fn new( + protocol_version: ProtocolVersion, + epoch_first_block: CryptoHash, + prev_block: &Block, + shard_layout: &ShardLayout, + shards: &[ShardId], + ) -> Self { + if ProtocolFeature::StateSyncHashUpdate.enabled(protocol_version) { + Self::new_current_epoch(epoch_first_block, prev_block, shard_layout, shards) + } else { + Self::new_previous_epoch(epoch_first_block, prev_block, shard_layout, shards) + } + } + + /// Block hash that identifies this state sync struct on disk + pub fn epoch_first_block(&self) -> &CryptoHash { + match self { + Self::V0(info) => &info.sync_hash, + Self::V1(info) => &info.epoch_first_block, + } + } + + pub fn shards(&self) -> &[ShardInfo] { + match self { + Self::V0(info) => &info.shards, + Self::V1(info) => &info.shards, + } + } +} + pub mod shard_chunk_header_inner; pub use shard_chunk_header_inner::{ ShardChunkHeaderInner, ShardChunkHeaderInnerV1, ShardChunkHeaderInnerV2, diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 917ffcf8f8e..2066866bddf 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -335,6 +335,15 @@ impl Store { self.storage.iter(col) } + pub fn iter_ser<'a, T: BorshDeserialize>( + &'a self, + col: DBCol, + ) -> impl Iterator, T)>> + 'a { + self.storage + .iter(col) + .map(|item| item.and_then(|(key, value)| Ok((key, T::try_from_slice(value.as_ref())?)))) + } + /// Fetches raw key/value pairs from the database. /// /// Practically, this means that for rc columns rc is included in the value. diff --git a/core/store/src/metadata.rs b/core/store/src/metadata.rs index eec060fdd5c..f14473cddb3 100644 --- a/core/store/src/metadata.rs +++ b/core/store/src/metadata.rs @@ -2,7 +2,7 @@ pub type DbVersion = u32; /// Current version of the database. -pub const DB_VERSION: DbVersion = 41; +pub const DB_VERSION: DbVersion = 42; /// Database version at which point DbKind was introduced. const DB_VERSION_WITH_KIND: DbVersion = 34; diff --git a/core/store/src/migrations.rs b/core/store/src/migrations.rs index 48ae8d25be5..29c625d9951 100644 --- a/core/store/src/migrations.rs +++ b/core/store/src/migrations.rs @@ -1,10 +1,12 @@ use crate::metadata::DbKind; use crate::{DBCol, Store, StoreUpdate}; +use anyhow::Context; use borsh::{BorshDeserialize, BorshSerialize}; use near_primitives::challenge::PartialState; use near_primitives::epoch_manager::EpochSummary; use near_primitives::epoch_manager::AGGREGATOR_KEY; use near_primitives::hash::CryptoHash; +use near_primitives::sharding::{ShardInfo, StateSyncInfo, StateSyncInfoV0}; use near_primitives::state::FlatStateValue; use near_primitives::stateless_validation::stored_chunk_state_transition_data::{ StoredChunkStateTransitionData, StoredChunkStateTransitionDataV1, @@ -118,9 +120,7 @@ impl<'a> BatchedStoreUpdate<'a> { /// new blocks. pub fn migrate_32_to_33(store: &Store) -> anyhow::Result<()> { let mut update = BatchedStoreUpdate::new(&store, 10_000_000); - for row in - store.iter_prefix_ser::>(DBCol::_TransactionResult, &[]) - { + for row in store.iter_ser::>(DBCol::_TransactionResult) { let (_, mut outcomes) = row?; // It appears that it was possible that the same entry in the original column contained // duplicate outcomes. We remove them here to avoid panicing due to issuing a @@ -363,7 +363,7 @@ pub fn migrate_39_to_40(store: &Store) -> anyhow::Result<()> { Ok(()) } -/// Migrates the database from version 39 to 40. +/// Migrates the database from version 40 to 41. /// /// The migraton replaces non-enum StoredChunkStateTransitionData struct with its enum version. pub fn migrate_40_to_41(store: &Store) -> anyhow::Result<()> { @@ -391,3 +391,34 @@ pub fn migrate_40_to_41(store: &Store) -> anyhow::Result<()> { update.commit()?; Ok(()) } + +/// Migrates the database from version 41 to 42. +/// +/// This rewrites the contents of the StateDlInfos column +pub fn migrate_41_to_42(store: &Store) -> anyhow::Result<()> { + #[derive(BorshSerialize, BorshDeserialize)] + struct LegacyStateSyncInfo { + sync_hash: CryptoHash, + shards: Vec, + } + + let mut update = store.store_update(); + + for row in store.iter_ser::(DBCol::StateDlInfos) { + let (key, LegacyStateSyncInfo { sync_hash, shards }) = + row.context("failed deserializing legacy StateSyncInfo in StateDlInfos")?; + + let epoch_first_block = CryptoHash::try_from_slice(&key) + .context("failed deserializing CryptoHash key in StateDlInfos")?; + + if epoch_first_block != sync_hash { + tracing::warn!(key = %epoch_first_block, %sync_hash, "sync_hash field of legacy StateSyncInfo not equal to the key. Something is wrong with this node's catchup info"); + } + let new_info = StateSyncInfo::V0(StateSyncInfoV0 { sync_hash, shards }); + update + .set_ser(DBCol::StateDlInfos, &key, &new_info) + .context("failed writing to StateDlInfos")?; + } + update.commit()?; + Ok(()) +} diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index debe2d0e903..f32fe6182fa 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -53,6 +53,12 @@ enum DropConditionKind { /// Whether test loop should drop all chunks in the given range of heights /// relative to first block height where protocol version changes. ProtocolUpgradeChunkRange((ProtocolVersion, HashMap>)), + /// Specifies the chunks that should be produced by their appearance in the + /// chain with respect to the start of an epoch. That is, a given chunk at height + /// `height_created` for shard `shard_id` will be produced if + /// self.0[`shard_id`][`height_created` - `epoch_start`] is true, or if + /// `height_created` - `epoch_start` > self.0[`shard_id`].len() + ChunksProducedByHeight(HashMap>), } pub(crate) struct TestLoopBuilder { @@ -104,6 +110,33 @@ fn is_chunk_validated_by( return chunk_validators.contains(&account_id); } +/// returns !chunks_produced[shard_id][height_created - epoch_start]. +fn should_drop_chunk_by_height( + epoch_manager_adapter: Arc, + chunk: ShardChunkHeader, + chunks_produced: HashMap>, +) -> bool { + let prev_block_hash = chunk.prev_block_hash(); + let shard_id = chunk.shard_id(); + let height_created = chunk.height_created(); + + let height_in_epoch = + if epoch_manager_adapter.is_next_block_epoch_start(prev_block_hash).unwrap() { + 0 + } else { + let epoch_start = + epoch_manager_adapter.get_epoch_start_height(prev_block_hash).unwrap(); + height_created - epoch_start + }; + let Some(chunks_produced) = chunks_produced.get(&shard_id) else { + return false; + }; + let Some(should_produce) = chunks_produced.get(height_in_epoch as usize) else { + return false; + }; + !*should_produce +} + /// Returns true if the chunk should be dropped based on the /// `DropCondition::ProtocolUpgradeChunkRange`. fn should_drop_chunk_for_protocol_upgrade( @@ -220,6 +253,22 @@ fn register_drop_condition( drop_chunks_condition, )); } + DropConditionKind::ChunksProducedByHeight(chunks_produced) => { + let inner_epoch_manager_adapter = epoch_manager_adapter.clone(); + let chunks_produced = chunks_produced.clone(); + let drop_chunks_condition = Box::new(move |chunk: ShardChunkHeader| -> bool { + should_drop_chunk_by_height( + inner_epoch_manager_adapter.clone(), + chunk, + chunks_produced.clone(), + ) + }); + peer_manager_actor.register_override_handler(chunk_endorsement_dropper_by_hash( + chunks_storage, + epoch_manager_adapter.clone(), + drop_chunks_condition, + )); + } } } @@ -311,6 +360,17 @@ impl TestLoopBuilder { self } + pub(crate) fn drop_chunks_by_height( + mut self, + chunks_produced: HashMap>, + ) -> Self { + if !chunks_produced.is_empty() { + self.drop_condition_kinds + .push(DropConditionKind::ChunksProducedByHeight(chunks_produced)); + } + self + } + pub(crate) fn gc_num_epochs_to_keep(mut self, num_epochs: u64) -> Self { self.gc_num_epochs_to_keep = Some(num_epochs); self diff --git a/integration-tests/src/test_loop/tests/fix_min_stake_ratio.rs b/integration-tests/src/test_loop/tests/fix_min_stake_ratio.rs index 8daac57260a..b2ec007544c 100644 --- a/integration-tests/src/test_loop/tests/fix_min_stake_ratio.rs +++ b/integration-tests/src/test_loop/tests/fix_min_stake_ratio.rs @@ -69,7 +69,7 @@ fn test_fix_min_stake_ratio() { .shard_layout(epoch_config_store.get_config(protocol_version).as_ref().shard_layout.clone()) .protocol_version(protocol_version) .epoch_length(epoch_length) - .validators_raw(validators, 1, 2) + .validators_raw(validators, 1, 1, 2) // Disable validator rewards. .max_inflation_rate(Rational32::new(0, 1)); for account in &accounts { diff --git a/integration-tests/src/test_loop/tests/mod.rs b/integration-tests/src/test_loop/tests/mod.rs index 0d0b06f9241..a9ac6de8eee 100644 --- a/integration-tests/src/test_loop/tests/mod.rs +++ b/integration-tests/src/test_loop/tests/mod.rs @@ -11,5 +11,6 @@ pub mod multinode_test_loop_example; pub mod protocol_upgrade; mod resharding_v3; pub mod simple_test_loop_example; +pub mod state_sync; pub mod syncing; pub mod view_requests_to_archival_node; diff --git a/integration-tests/src/test_loop/tests/state_sync.rs b/integration-tests/src/test_loop/tests/state_sync.rs new file mode 100644 index 00000000000..94d79167379 --- /dev/null +++ b/integration-tests/src/test_loop/tests/state_sync.rs @@ -0,0 +1,359 @@ +use near_async::messaging::{Handler, SendAsync}; +use near_async::test_loop::TestLoopV2; +use near_async::time::Duration; +use near_chain_configs::test_genesis::TestGenesisBuilder; +use near_network::client::{ProcessTxRequest, StateRequestHeader}; +use near_o11y::testonly::init_test_logger; +use near_primitives::hash::CryptoHash; +use near_primitives::test_utils::create_user_test_signer; +use near_primitives::transaction::SignedTransaction; +use near_primitives::types::{AccountId, AccountInfo, BlockHeightDelta, Nonce, NumSeats, ShardId}; + +use crate::test_loop::builder::TestLoopBuilder; +use crate::test_loop::env::{TestData, TestLoopEnv}; +use crate::test_loop::utils::transactions::get_anchor_hash; +use crate::test_loop::utils::ONE_NEAR; + +use itertools::Itertools; +use std::collections::HashMap; + +const EPOCH_LENGTH: BlockHeightDelta = 40; + +struct ShardAccounts { + boundary_accounts: Vec, + accounts: Vec>, +} + +fn generate_accounts(num_shards: usize) -> ShardAccounts { + let accounts_per_shard = 5; + + if num_shards > 27 { + todo!("don't know how to include more than 27 shards yet!"); + } + let mut boundary_accounts = Vec::::new(); + for c in b'a'..=b'z' { + if boundary_accounts.len() + 1 >= num_shards { + break; + } + let mut boundary_account = format!("{}", c as char); + while boundary_account.len() < AccountId::MIN_LEN { + boundary_account.push('0'); + } + boundary_accounts.push(boundary_account); + } + + let mut accounts = Vec::new(); + let mut account_base = "0"; + for a in boundary_accounts.iter() { + accounts.push( + (0..accounts_per_shard) + .map(|i| (format!("{}{}", account_base, i).parse().unwrap(), 1)) + .collect::>(), + ); + account_base = a.as_str(); + } + accounts.push( + (0..accounts_per_shard) + .map(|i| (format!("{}{}", account_base, i).parse().unwrap(), 1)) + .collect::>(), + ); + + ShardAccounts { boundary_accounts, accounts } +} + +struct TestState { + env: TestLoopEnv, + accounts: Vec>, +} + +fn setup_initial_blockchain( + num_shards: usize, + chunks_produced: HashMap>, +) -> TestState { + let builder = TestLoopBuilder::new(); + + let num_block_producer_seats = 1; + let num_chunk_producer_seats = num_shards; + let num_validators = std::cmp::max(num_block_producer_seats, num_chunk_producer_seats); + let validators = (0..num_validators) + .map(|i| { + let account_id = format!("node{}", i); + AccountInfo { + account_id: account_id.parse().unwrap(), + public_key: near_primitives::test_utils::create_test_signer(account_id.as_str()) + .public_key(), + amount: 10000 * ONE_NEAR, + } + }) + .collect::>(); + let clients = validators.iter().map(|v| v.account_id.clone()).collect::>(); + + let ShardAccounts { boundary_accounts, accounts } = generate_accounts(num_shards); + + let mut genesis_builder = TestGenesisBuilder::new(); + genesis_builder + .genesis_time_from_clock(&builder.clock()) + .protocol_version_latest() + .genesis_height(10000) + .epoch_length(EPOCH_LENGTH) + .gas_prices_free() + .gas_limit_one_petagas() + .shard_layout_simple_v1(&boundary_accounts.iter().map(|s| s.as_str()).collect::>()) + .transaction_validity_period(1000) + .epoch_length(10) + .validators_raw( + validators, + num_block_producer_seats as NumSeats, + num_chunk_producer_seats as NumSeats, + 0, + ) + // shuffle the shard assignment so that nodes will have to state sync to catch up future tracked shards. + // This part is the only reference to state sync at all in this test, since all we check is that the blockchain + // progresses for a few epochs, meaning that state sync must have been successful. + .shuffle_shard_assignment_for_chunk_producers(true); + for accounts in accounts.iter() { + for (account, _nonce) in accounts.iter() { + genesis_builder.add_user_account_simple(account.clone(), 10000 * ONE_NEAR); + } + } + let (genesis, epoch_config_store) = genesis_builder.build(); + + let env = builder + .genesis(genesis) + .epoch_config_store(epoch_config_store) + .clients(clients) + .drop_chunks_by_height(chunks_produced) + .build(); + + TestState { env, accounts } +} + +fn get_wrapped(s: &[T], idx: usize) -> &T { + &s[idx % s.len()] +} + +fn get_wrapped_mut(s: &mut [T], idx: usize) -> &mut T { + &mut s[idx % s.len()] +} + +/// tries to generate transactions between lots of different pairs of shards (accounts for shard i are in accounts[i]) +fn send_txs_between_shards( + test_loop: &mut TestLoopV2, + node_data: &[TestData], + accounts: &mut [Vec<(AccountId, Nonce)>], +) { + let clients = node_data + .iter() + .map(|data| &test_loop.data.get(&data.client_sender.actor_handle()).client) + .collect_vec(); + let block_hash = get_anchor_hash(&clients); + + let num_shards = accounts.len(); + + // which client should we send txs to next? + let mut client_idx = 0; + let mut from_shard = 0; + // which account should we choose among all the accounts of a shard? + let mut account_idx = 0; + let mut shard_diff = 1; + + let mut txs_sent = 0; + while txs_sent < 200 { + let to_shard = (from_shard + shard_diff) % num_shards; + let (receiver, _nonce) = get_wrapped(&accounts[to_shard], account_idx); + let receiver = receiver.clone(); + let (sender, nonce) = get_wrapped_mut(&mut accounts[from_shard], account_idx); + + let tx = SignedTransaction::send_money( + *nonce, + sender.clone(), + receiver.clone(), + &create_user_test_signer(sender).into(), + 1000, + block_hash, + ); + *nonce += 1; + + let future = get_wrapped(node_data, client_idx) + .client_sender + .clone() + //.with_delay(Duration::milliseconds(300 * txs_sent as i64)) + .send_async(ProcessTxRequest { + transaction: tx, + is_forwarded: false, + check_only: false, + }); + drop(future); + + txs_sent += 1; + from_shard = (from_shard + 1) % num_shards; + if from_shard == 0 { + shard_diff += 1; + } + account_idx += 1; + client_idx = 1; + } +} + +/// runs the network and sends transactions at the beginning of each epoch. At the end the condition we're +/// looking for is just that a few epochs have passed, because that should only be possible if state sync was successful +/// (which will be required because we enable chunk producer shard shuffling on this chain) +fn produce_chunks(env: &mut TestLoopEnv, mut accounts: Vec>) { + let handle = env.datas[0].client_sender.actor_handle(); + let client = &env.test_loop.data.get(&handle).client; + let mut tip = client.chain.head().unwrap(); + // TODO: make this more precise. We don't have to wait 2 whole seconds, but the amount we wait will + // depend on whether this block is meant to have skipped chunks. + let timeout = client.config.min_block_production_delay + Duration::seconds(2); + + let mut epoch_id_switches = 0; + loop { + env.test_loop.run_until( + |data| { + let client = &data.get(&handle).client; + let new_tip = client.chain.head().unwrap(); + new_tip.height != tip.height + }, + timeout, + ); + + let handle = env.datas[0].client_sender.actor_handle(); + let client = &env.test_loop.data.get(&handle).client; + let new_tip = client.chain.head().unwrap(); + let header = client.chain.get_block_header(&tip.last_block_hash).unwrap(); + tracing::debug!("chunk mask for #{} {:?}", header.height(), header.chunk_mask()); + + if new_tip.epoch_id != tip.epoch_id { + epoch_id_switches += 1; + if epoch_id_switches > 2 { + break; + } + send_txs_between_shards(&mut env.test_loop, &env.datas, &mut accounts); + } + tip = new_tip; + } +} + +fn run_test(state: TestState) { + let TestState { mut env, mut accounts } = state; + let handle = env.datas[0].client_sender.actor_handle(); + let client = &env.test_loop.data.get(&handle).client; + let first_epoch_time = client.config.min_block_production_delay + * u32::try_from(EPOCH_LENGTH).unwrap_or(u32::MAX) + + Duration::seconds(2); + + send_txs_between_shards(&mut env.test_loop, &env.datas, &mut accounts); + + env.test_loop.run_until( + |data| { + let handle = env.datas[0].client_sender.actor_handle(); + let client = &data.get(&handle).client; + let tip = client.chain.head().unwrap(); + tip.epoch_id != Default::default() + }, + first_epoch_time, + ); + + produce_chunks(&mut env, accounts); + env.shutdown_and_drain_remaining_events(Duration::seconds(3)); +} + +#[derive(Debug)] +struct StateSyncTest { + num_shards: usize, + chunks_produced: &'static [(ShardId, &'static [bool])], +} + +static TEST_CASES: &[StateSyncTest] = &[ + // The first two make no modifications to chunks_produced, and all chunks should be produced. This is the normal case + StateSyncTest { num_shards: 2, chunks_produced: &[] }, + StateSyncTest { num_shards: 4, chunks_produced: &[] }, + // Now we miss some chunks at the beginning of the epoch + StateSyncTest { + num_shards: 4, + chunks_produced: &[ + (ShardId::new(0), &[false]), + (ShardId::new(1), &[true]), + (ShardId::new(2), &[true]), + (ShardId::new(3), &[true]), + ], + }, + StateSyncTest { + num_shards: 4, + chunks_produced: &[(ShardId::new(0), &[true, false]), (ShardId::new(1), &[true, false])], + }, + StateSyncTest { + num_shards: 4, + chunks_produced: &[ + (ShardId::new(0), &[false, true]), + (ShardId::new(2), &[true, false, true]), + ], + }, +]; + +#[test] +fn test_state_sync_current_epoch() { + init_test_logger(); + + for t in TEST_CASES.iter() { + tracing::info!("run test: {:?}", t); + let state = setup_initial_blockchain( + t.num_shards, + t.chunks_produced + .iter() + .map(|(shard_id, produced)| (*shard_id, produced.to_vec())) + .collect(), + ); + run_test(state); + } +} + +fn spam_state_sync_header_reqs(env: &mut TestLoopEnv, sync_hash: CryptoHash) { + let view_client_handle = env.datas[0].view_client_sender.actor_handle(); + let view_client = env.test_loop.data.get_mut(&view_client_handle); + + for _ in 0..30 { + let res = view_client.handle(StateRequestHeader { shard_id: ShardId::new(0), sync_hash }); + assert!(res.is_some()); + } + + // immediately query again, should be rejected + let shard_id = ShardId::new(0); + let res = view_client.handle(StateRequestHeader { shard_id, sync_hash }); + assert!(res.is_none()); + + env.test_loop.run_for(Duration::seconds(40)); + + let view_client_handle = env.datas[0].view_client_sender.actor_handle(); + let view_client = env.test_loop.data.get_mut(&view_client_handle); + + let res = view_client.handle(StateRequestHeader { shard_id, sync_hash }); + assert!(res.is_some()); +} + +#[test] +fn test_state_request() { + init_test_logger(); + + let TestState { mut env, .. } = setup_initial_blockchain(4, HashMap::default()); + + env.test_loop.run_until( + |data| { + let handle = env.datas[0].client_sender.actor_handle(); + let client = &data.get(&handle).client; + let tip = client.chain.head().unwrap(); + if tip.epoch_id == Default::default() { + return false; + } + client.chain.get_sync_hash(&tip.last_block_hash).unwrap().is_some() + }, + Duration::seconds(20), + ); + let client_handle = env.datas[0].client_sender.actor_handle(); + let client = &env.test_loop.data.get(&client_handle).client; + let tip = client.chain.head().unwrap(); + let sync_hash = client.chain.get_sync_hash(&tip.last_block_hash).unwrap().unwrap(); + + spam_state_sync_header_reqs(&mut env, sync_hash); + env.shutdown_and_drain_remaining_events(Duration::seconds(3)); +} diff --git a/integration-tests/src/test_loop/utils/transactions.rs b/integration-tests/src/test_loop/utils/transactions.rs index 0f26decd100..ff95ec6db8a 100644 --- a/integration-tests/src/test_loop/utils/transactions.rs +++ b/integration-tests/src/test_loop/utils/transactions.rs @@ -5,6 +5,7 @@ use near_async::messaging::{CanSend, SendAsync}; use near_async::test_loop::TestLoopV2; use near_async::time::Duration; use near_client::test_utils::test_loop::ClientQueries; +use near_client::Client; use near_client::ProcessTxResponse; use near_network::client::ProcessTxRequest; use near_primitives::errors::InvalidTxError; @@ -31,6 +32,21 @@ pub(crate) struct BalanceMismatchError { pub actual: u128, } +// Transactions have to be built on top of some block in chain. To make +// sure all clients accept them, we select the head of the client with +// the smallest height. +pub(crate) fn get_anchor_hash(clients: &[&Client]) -> CryptoHash { + let (_, anchor_hash) = clients + .iter() + .map(|client| { + let head = client.chain.head().unwrap(); + (head.height, head.last_block_hash) + }) + .min_by_key(|&(height, _)| height) + .unwrap(); + anchor_hash +} + /// Execute money transfers within given `TestLoop` between given accounts. /// Runs chain long enough for the transfers to be optimistically executed. /// Used to generate state changes and check that chain is able to update @@ -74,17 +90,7 @@ pub(crate) fn execute_money_transfers( .map(|test_data| &data.get(&test_data.client_sender.actor_handle()).client) .collect_vec(); - // Transactions have to be built on top of some block in chain. To make - // sure all clients accept them, we select the head of the client with - // the smallest height. - let (_, anchor_hash) = clients - .iter() - .map(|client| { - let head = client.chain.head().unwrap(); - (head.height, head.last_block_hash) - }) - .min_by_key(|&(height, _)| height) - .unwrap(); + let anchor_hash = get_anchor_hash(&clients); let tx = SignedTransaction::send_money( // TODO: set correct nonce. diff --git a/integration-tests/src/tests/client/process_blocks.rs b/integration-tests/src/tests/client/process_blocks.rs index da184e38f79..c27f3066025 100644 --- a/integration-tests/src/tests/client/process_blocks.rs +++ b/integration-tests/src/tests/client/process_blocks.rs @@ -1676,12 +1676,33 @@ fn test_process_block_after_state_sync() { .nightshade_runtimes(&genesis) .build(); - let sync_height = epoch_length * 4 + 1; - for i in 1..=sync_height { - env.produce_block(0, i); - } - let sync_block = env.clients[0].chain.get_block_by_height(sync_height).unwrap(); - let sync_hash = *sync_block.hash(); + let mut sync_hash_attempts = 0; + let mut next_height = 1; + let sync_hash = loop { + let block = env.clients[0].produce_block(next_height).unwrap().unwrap(); + let block_hash = *block.hash(); + let prev_hash = *block.header().prev_hash(); + env.process_block(0, block, Provenance::PRODUCED); + next_height += 1; + + let epoch_height = + env.clients[0].epoch_manager.get_epoch_height_from_prev_block(&prev_hash).unwrap(); + if epoch_height < 4 { + continue; + } + let Some(sync_hash) = env.clients[0].chain.get_sync_hash(&block_hash).unwrap() else { + sync_hash_attempts += 1; + // This should not happen, but we guard against it defensively so we don't have some infinite loop in + // case of a bug + assert!(sync_hash_attempts <= 2, "sync_hash_attempts: {}", sync_hash_attempts); + continue; + }; + // Produce one more block after the sync hash is found so that the snapshot will be created + if sync_hash != block_hash { + break sync_hash; + } + }; + let sync_block = env.clients[0].chain.get_block(&sync_hash).unwrap(); let shard_id = ShardId::new(0); let header = env.clients[0].chain.compute_state_response_header(shard_id, sync_hash).unwrap(); @@ -1694,7 +1715,7 @@ fn test_process_block_after_state_sync() { .obtain_state_part(shard_id, &sync_prev_prev_hash, &state_root, PartId::new(0, 1)) .unwrap(); // reset cache - for i in epoch_length * 3 - 1..sync_height - 1 { + for i in epoch_length * 3 - 1..sync_block.header().height() - 1 { let block_hash = *env.clients[0].chain.get_block_by_height(i).unwrap().hash(); assert!(env.clients[0].chain.epoch_manager.get_epoch_start_height(&block_hash).is_ok()); } @@ -1704,7 +1725,7 @@ fn test_process_block_after_state_sync() { .runtime_adapter .apply_state_part(shard_id, &state_root, PartId::new(0, 1), &state_part, &epoch_id) .unwrap(); - let block = env.clients[0].produce_block(sync_height + 1).unwrap().unwrap(); + let block = env.clients[0].produce_block(next_height).unwrap().unwrap(); env.clients[0].process_block_test(block.into(), Provenance::PRODUCED).unwrap(); } @@ -2157,6 +2178,7 @@ fn test_data_reset_before_state_sync() { #[test] fn test_sync_hash_validity() { + init_test_logger(); let epoch_length = 5; let mut genesis = Genesis::test(vec!["test0".parse().unwrap(), "test1".parse().unwrap()], 1); genesis.config.epoch_length = epoch_length; @@ -2164,11 +2186,18 @@ fn test_sync_hash_validity() { for i in 1..19 { env.produce_block(0, i); } - for i in 0..19 { - let block_hash = *env.clients[0].chain.get_block_header_by_height(i).unwrap().hash(); - let res = env.clients[0].chain.check_sync_hash_validity(&block_hash); - println!("height {:?} -> {:?}", i, res); - assert_eq!(res.unwrap(), i == 0 || (i % epoch_length) == 1); + for i in 1..19 { + let header = env.clients[0].chain.get_block_header_by_height(i).unwrap(); + let block_hash = *header.hash(); + let valid = env.clients[0].chain.check_sync_hash_validity(&block_hash).unwrap(); + println!("height {} -> {}", i, valid); + if ProtocolFeature::StateSyncHashUpdate.enabled(PROTOCOL_VERSION) { + // This assumes that all shards have new chunks in every block, which should be true + // with TestEnv::produce_block() + assert_eq!(valid, (i % epoch_length) == 3); + } else { + assert_eq!(valid, header.epoch_id() != &EpochId::default() && (i % epoch_length) == 1); + } } let bad_hash = CryptoHash::from_str("7tkzFg8RHBmMw1ncRJZCCZAizgq4rwCftTKYLce8RU8t").unwrap(); let res = env.clients[0].chain.check_sync_hash_validity(&bad_hash); @@ -2399,7 +2428,9 @@ fn test_catchup_gas_price_change() { assert_eq!(env.clients[0].process_tx(tx, false, false), ProcessTxResponse::ValidTx); } - for i in 3..=6 { + // We go up to height 9 because height 6 is the first block of the new epoch, and we want at least + // two more blocks (plus one more for nodes to create snapshots) if syncing to the current epoch's state + for i in 3..=9 { let block = env.clients[0].produce_block(i).unwrap().unwrap(); blocks.push(block.clone()); env.process_block(0, block.clone(), Provenance::PRODUCED); @@ -2415,8 +2446,18 @@ fn test_catchup_gas_price_change() { .is_err()); // Simulate state sync - let sync_hash = *blocks[5].hash(); - assert_ne!(blocks[4].header().epoch_id(), blocks[5].header().epoch_id()); + + let sync_hash = + env.clients[0].chain.get_sync_hash(blocks.last().unwrap().hash()).unwrap().unwrap(); + let sync_block_idx = blocks + .iter() + .position(|b| *b.hash() == sync_hash) + .expect("block with hash matching sync hash not found"); + if sync_block_idx == 0 { + panic!("sync block should not be the first block produced"); + } + let sync_prev_block = &blocks[sync_block_idx - 1]; + assert!(env.clients[0].chain.check_sync_hash_validity(&sync_hash).unwrap()); let shard_id = ShardId::new(0); let state_sync_header = @@ -2466,10 +2507,14 @@ fn test_catchup_gas_price_change() { } } env.clients[1].chain.set_state_finalize(shard_id, sync_hash).unwrap(); - let chunk_extra_after_sync = - env.clients[1].chain.get_chunk_extra(blocks[4].hash(), &ShardUId::single_shard()).unwrap(); - let expected_chunk_extra = - env.clients[0].chain.get_chunk_extra(blocks[4].hash(), &ShardUId::single_shard()).unwrap(); + let chunk_extra_after_sync = env.clients[1] + .chain + .get_chunk_extra(sync_prev_block.hash(), &ShardUId::single_shard()) + .unwrap(); + let expected_chunk_extra = env.clients[0] + .chain + .get_chunk_extra(sync_prev_block.hash(), &ShardUId::single_shard()) + .unwrap(); // The chunk extra of the prev block of sync block should be the same as the node that it is syncing from assert_eq!(chunk_extra_after_sync, expected_chunk_extra); } @@ -3687,8 +3732,16 @@ mod contract_precompilation_tests { start_height, ); + let sync_height = if ProtocolFeature::StateSyncHashUpdate.enabled(PROTOCOL_VERSION) { + // `height` is one more than the start of the epoch. Produce two more blocks with chunks, + // and then one more than that so the node will generate the neede snapshot. + produce_blocks_from_height(&mut env, 3, height) - 2 + } else { + height - 1 + }; + // Perform state sync for the second client. - state_sync_on_height(&mut env, height - 1); + state_sync_on_height(&mut env, sync_height); // Check existence of contract in both caches. let contract_code = ContractCode::new(wasm_code.clone(), None); @@ -3708,7 +3761,7 @@ mod contract_precompilation_tests { // Check that contract function may be successfully called on the second client. // Note that we can't test that behaviour is the same on two clients, because // compile_module_cached_wasmer0 is cached by contract key via macro. - let block = env.clients[0].chain.get_block_by_height(EPOCH_LENGTH).unwrap(); + let block = env.clients[0].chain.get_block_by_height(sync_height - 1).unwrap(); let shard_uid = ShardUId::single_shard(); let shard_id = shard_uid.shard_id(); let chunk_extra = @@ -3752,6 +3805,7 @@ mod contract_precompilation_tests { #[test] #[cfg_attr(all(target_arch = "aarch64", target_vendor = "apple"), ignore)] fn test_two_deployments() { + init_integration_logger(); let num_clients = 2; let mut genesis = Genesis::test(vec!["test0".parse().unwrap(), "test1".parse().unwrap()], 1); @@ -3792,11 +3846,19 @@ mod contract_precompilation_tests { height, ); + let sync_height = if ProtocolFeature::StateSyncHashUpdate.enabled(PROTOCOL_VERSION) { + // `height` is one more than the start of the epoch. Produce two more blocks with chunks, + // and then one more than that so the node will generate the neede snapshot. + produce_blocks_from_height(&mut env, 3, height) - 2 + } else { + height - 1 + }; + // Perform state sync for the second client on the last produced height. - state_sync_on_height(&mut env, height - 1); + state_sync_on_height(&mut env, sync_height); let epoch_id = - *env.clients[0].chain.get_block_by_height(height - 1).unwrap().header().epoch_id(); + *env.clients[0].chain.get_block_by_height(sync_height).unwrap().header().epoch_id(); let runtime_config = env.get_runtime_config(0, epoch_id); let tiny_contract_key = get_contract_cache_key( *ContractCode::new(tiny_wasm_code.clone(), None).hash(), @@ -3863,13 +3925,21 @@ mod contract_precompilation_tests { env.clients[0].process_tx(delete_account_tx, false, false), ProcessTxResponse::ValidTx ); - height = produce_blocks_from_height(&mut env, EPOCH_LENGTH + 1, height); + // `height` is the first block of a new epoch (which has not been produced yet), + // so if we want to state sync the old way, we produce `EPOCH_LENGTH` + 1 new blocks + // to get to produce the first block of the next epoch. If we want to state sync the new + // way, we produce two more than that, plus one more so that the node will generate the needed snapshot. + let sync_height = if ProtocolFeature::StateSyncHashUpdate.enabled(PROTOCOL_VERSION) { + produce_blocks_from_height(&mut env, EPOCH_LENGTH + 4, height) - 2 + } else { + produce_blocks_from_height(&mut env, EPOCH_LENGTH + 1, height) - 1 + }; // Perform state sync for the second client. - state_sync_on_height(&mut env, height - 1); + state_sync_on_height(&mut env, sync_height); let epoch_id = - *env.clients[0].chain.get_block_by_height(height - 1).unwrap().header().epoch_id(); + *env.clients[0].chain.get_block_by_height(sync_height).unwrap().header().epoch_id(); let runtime_config = env.get_runtime_config(0, epoch_id); let contract_key = get_contract_cache_key( *ContractCode::new(wasm_code.clone(), None).hash(), diff --git a/integration-tests/src/tests/client/state_dump.rs b/integration-tests/src/tests/client/state_dump.rs index 298d9a74cce..f3f2cb7d5a3 100644 --- a/integration-tests/src/tests/client/state_dump.rs +++ b/integration-tests/src/tests/client/state_dump.rs @@ -244,14 +244,7 @@ fn run_state_sync_with_dumped_parts( let epoch_info = epoch_manager.get_epoch_info(&epoch_id).unwrap(); let epoch_height = epoch_info.epoch_height(); - let sync_block_height = (epoch_length * epoch_height + 1) as usize; - let sync_hash = *blocks[sync_block_height - 1].hash(); - - // the block at sync_block_height should be the start of an epoch - assert_ne!( - blocks[sync_block_height - 1].header().epoch_id(), - blocks[sync_block_height - 2].header().epoch_id() - ); + let sync_hash = env.clients[0].chain.get_sync_hash(final_block_hash).unwrap().unwrap(); assert!(env.clients[0].chain.check_sync_hash_validity(&sync_hash).unwrap()); let state_sync_header = env.clients[0].chain.get_state_response_header(shard_id, sync_hash).unwrap(); @@ -375,7 +368,7 @@ fn run_state_sync_with_dumped_parts( } } -/// This test verifies that after state sync, the syncing node has the data that corresponds to the state of the epoch previous to the dumping node's final block. +/// This test verifies that after state sync, the syncing node has the data that corresponds to the state of the epoch previous (or current) to the dumping node's final block. /// Specifically, it tests that the above holds true in both conditions: /// - the dumping node's head is in new epoch but final block is not; /// - the dumping node's head and final block are in same epoch diff --git a/integration-tests/src/tests/client/sync_state_nodes.rs b/integration-tests/src/tests/client/sync_state_nodes.rs index 9a61db938d3..e94fcf01053 100644 --- a/integration-tests/src/tests/client/sync_state_nodes.rs +++ b/integration-tests/src/tests/client/sync_state_nodes.rs @@ -21,7 +21,7 @@ use near_primitives::state_sync::StatePartKey; use near_primitives::transaction::SignedTransaction; use near_primitives::types::{BlockId, BlockReference, EpochId, EpochReference, ShardId}; use near_primitives::utils::MaybeValidated; -use near_primitives::version::ProtocolFeature; +use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION}; use near_store::adapter::StoreUpdateAdapter; use near_store::DBCol; use nearcore::test_utils::TestEnvNightshadeSetupExt; @@ -580,12 +580,12 @@ fn test_dump_epoch_missing_chunk_in_last_block() { let epoch_length = 10; let shard_id = ShardId::new(0); - for num_last_chunks_missing in 0..6 { - assert!(num_last_chunks_missing < epoch_length); + for num_chunks_missing in 0..6 { + assert!(num_chunks_missing < epoch_length); tracing::info!( target: "test", - ?num_last_chunks_missing, + ?num_chunks_missing, "starting test_dump_epoch_missing_chunk_in_last_block" ); let mut genesis = @@ -603,8 +603,36 @@ fn test_dump_epoch_missing_chunk_in_last_block() { let signer = InMemorySigner::from_seed("test0".parse().unwrap(), KeyType::ED25519, "test0") .into(); - let target_height = epoch_length + 1; - for i in 1..=target_height { + + let next_epoch_start = epoch_length + 1; + let protocol_version = env.clients[0] + .epoch_manager + .get_epoch_protocol_version(&EpochId::default()) + .unwrap(); + // Note that the height to skip here refers to the height at which not to produce chunks for the next block, so really + // one before the block height that will have no chunks. The sync_height is the height of the sync_hash block. + let (start_skipping_chunks, sync_height) = + if ProtocolFeature::StateSyncHashUpdate.enabled(protocol_version) { + // At the beginning of the epoch, produce one block with chunks and then start skipping chunks. + let start_skipping_chunks = next_epoch_start + 1; + // Then we will skip `num_chunks_missing` chunks, and produce one more with chunks, which will be the sync height. + let sync_height = start_skipping_chunks + num_chunks_missing + 1; + (start_skipping_chunks, sync_height) + } else { + // here the sync hash is the first hash of the epoch + let sync_height = next_epoch_start; + // skip chunks before the epoch start, but not including the one right before the epochs start. + let start_skipping_chunks = sync_height - num_chunks_missing - 1; + (start_skipping_chunks, sync_height) + }; + + // produce chunks right before the sync hash block, so that the sync hash block itself will have new chunks. + let stop_skipping_chunks = sync_height - 1; + + assert!(sync_height < 2 * epoch_length + 1); + + // Produce blocks up to sync_height + 1 to give nodes a chance to create the necessary state snapshot + for i in 1..=sync_height + 1 { tracing::info!( target: "test", height=i, @@ -613,9 +641,7 @@ fn test_dump_epoch_missing_chunk_in_last_block() { let block = env.clients[0].produce_block(i).unwrap().unwrap(); blocks.push(block.clone()); - if (i % epoch_length) != 0 - && epoch_length - (i % epoch_length) <= num_last_chunks_missing - { + if i >= start_skipping_chunks && i < stop_skipping_chunks { // Don't produce chunks for the last blocks of an epoch. env.clients[0] .process_block_test_no_produce_chunk( @@ -650,13 +676,18 @@ fn test_dump_epoch_missing_chunk_in_last_block() { // Simulate state sync tracing::info!(target: "test", "state sync - get parts"); - // No blocks were skipped, therefore we can compute the block height of the first block of the current epoch. - let sync_hash_height = ((target_height / epoch_length) * epoch_length + 1) as usize; - let sync_hash = *blocks[sync_hash_height].hash(); - assert_ne!( - blocks[sync_hash_height].header().epoch_id(), - blocks[sync_hash_height - 1].header().epoch_id() - ); + let sync_hash = + env.clients[0].chain.get_sync_hash(blocks.last().unwrap().hash()).unwrap().unwrap(); + let sync_block_idx = blocks + .iter() + .position(|b| *b.hash() == sync_hash) + .expect("block with hash matching sync hash not found"); + let sync_block = &blocks[sync_block_idx]; + if sync_block_idx == 0 { + panic!("sync block should not be the first block produced"); + } + let sync_prev_block = &blocks[sync_block_idx - 1]; + let sync_prev_height_included = sync_prev_block.chunks()[0].height_included(); let state_sync_header = env.clients[0].chain.get_state_response_header(shard_id, sync_hash).unwrap(); @@ -672,7 +703,7 @@ fn test_dump_epoch_missing_chunk_in_last_block() { tracing::info!(target: "test", "state sync - apply parts"); env.clients[1].chain.reset_data_pre_state_sync(sync_hash).unwrap(); - let epoch_id = blocks.last().unwrap().header().epoch_id(); + let epoch_id = sync_block.header().epoch_id(); for i in 0..num_parts { env.clients[1] .runtime_adapter @@ -724,7 +755,7 @@ fn test_dump_epoch_missing_chunk_in_last_block() { &state_sync_header.chunk_prev_state_root(), PartId::new(part_id, num_parts), &part, - blocks[sync_hash_height].header().epoch_id(), + epoch_id, ) .unwrap(); } @@ -733,9 +764,11 @@ fn test_dump_epoch_missing_chunk_in_last_block() { tracing::info!(target: "test", "state sync - set state finalize"); env.clients[1].chain.set_state_finalize(shard_id, sync_hash).unwrap(); - let last_chunk_height = epoch_length - num_last_chunks_missing; - for height in 1..epoch_length { - if height < last_chunk_height { + // We apply chunks from the block with height `sync_prev_height_included` up to `sync_prev`. So there should + // be chunk extras for those all equal to the chunk extra for `sync_prev_height_included`, and no chunk extras + // for any other height. + for height in 1..sync_height { + if height < sync_prev_height_included || height >= sync_height { assert!(env.clients[1] .chain .get_chunk_extra(blocks[height as usize].hash(), &ShardUId::single_shard()) @@ -750,7 +783,7 @@ fn test_dump_epoch_missing_chunk_in_last_block() { { height } else { - last_chunk_height + sync_prev_height_included }; let expected_chunk_extra = env.clients[0] .chain @@ -829,7 +862,14 @@ fn test_state_sync_headers() { }; tracing::info!(epoch_start_height, "got epoch_start_height"); - let block_id = BlockReference::BlockId(BlockId::Height(epoch_start_height)); + let sync_height = if ProtocolFeature::StateSyncHashUpdate.enabled(PROTOCOL_VERSION) + { + // here since there's only one block/chunk producer, we assume that no blocks will be missing chunks. + epoch_start_height + 2 + } else { + epoch_start_height + }; + let block_id = BlockReference::BlockId(BlockId::Height(sync_height)); let block_view = view_client1.send(GetBlock(block_id).with_span_context()).await; let Ok(Ok(block_view)) = block_view else { return ControlFlow::Continue(()); @@ -1007,7 +1047,14 @@ fn test_state_sync_headers_no_tracked_shards() { return ControlFlow::Continue(()); } - let block_id = BlockReference::BlockId(BlockId::Height(epoch_start_height)); + let sync_height = if ProtocolFeature::StateSyncHashUpdate.enabled(PROTOCOL_VERSION) + { + // here since there's only one block/chunk producer, we assume that no blocks will be missing chunks. + epoch_start_height + 2 + } else { + epoch_start_height + }; + let block_id = BlockReference::BlockId(BlockId::Height(sync_height)); let block_view = view_client2.send(GetBlock(block_id).with_span_context()).await; let Ok(Ok(block_view)) = block_view else { return ControlFlow::Continue(()); diff --git a/nearcore/src/migrations.rs b/nearcore/src/migrations.rs index 83c26937779..36b56c45018 100644 --- a/nearcore/src/migrations.rs +++ b/nearcore/src/migrations.rs @@ -87,6 +87,7 @@ impl<'a> near_store::StoreMigrator for Migrator<'a> { 38 => near_store::migrations::migrate_38_to_39(store), 39 => near_store::migrations::migrate_39_to_40(store), 40 => near_store::migrations::migrate_40_to_41(store), + 41 => near_store::migrations::migrate_41_to_42(store), DB_VERSION.. => unreachable!(), } } diff --git a/nearcore/src/state_sync.rs b/nearcore/src/state_sync.rs index 742451bc5db..5bee704b824 100644 --- a/nearcore/src/state_sync.rs +++ b/nearcore/src/state_sync.rs @@ -15,7 +15,6 @@ use near_client::sync::external::{ external_storage_location_directory, get_part_id_from_filename, is_part_filename, ExternalConnection, }; -use near_client::sync::state::get_epoch_start_sync_hash; use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::EpochManagerAdapter; use near_primitives::hash::CryptoHash; @@ -269,6 +268,11 @@ fn get_current_state( err })?; + let new_sync_hash = match new_sync_hash { + Some(h) => h, + None => return Ok(StateDumpAction::Wait), + }; + if Some(&new_epoch_id) == was_last_epoch_done.as_ref() { tracing::debug!(target: "state_sync_dump", ?shard_id, ?was_last_epoch_done, ?new_epoch_id, new_epoch_height, ?new_sync_hash, "latest epoch is done. No new epoch to dump. Idle"); Ok(StateDumpAction::Wait) @@ -652,7 +656,7 @@ struct LatestEpochInfo { prev_epoch_id: EpochId, epoch_id: EpochId, epoch_height: EpochHeight, - sync_hash: CryptoHash, + sync_hash: Option, } /// return epoch_id and sync_hash of the latest complete epoch available locally. @@ -666,12 +670,13 @@ fn get_latest_epoch( let hash = head.last_block_hash; let header = chain.get_block_header(&hash)?; let final_hash = header.last_final_block(); - let sync_hash = get_epoch_start_sync_hash(chain, final_hash)?; + let sync_hash = chain.get_sync_hash(final_hash)?; let final_block_header = chain.get_block_header(&final_hash)?; let epoch_id = *final_block_header.epoch_id(); let epoch_info = epoch_manager.get_epoch_info(&epoch_id)?; let prev_epoch_id = epoch_manager.get_prev_epoch_id_from_prev_block(&head.prev_block_hash)?; let epoch_height = epoch_info.epoch_height(); + tracing::debug!(target: "state_sync_dump", ?final_hash, ?sync_hash, ?epoch_id, epoch_height, "get_latest_epoch"); Ok(LatestEpochInfo { prev_epoch_id, epoch_id, epoch_height, sync_hash }) diff --git a/pytest/tests/sanity/state_parts_dump_check.py b/pytest/tests/sanity/state_parts_dump_check.py index 815dd4881db..77e576b4792 100644 --- a/pytest/tests/sanity/state_parts_dump_check.py +++ b/pytest/tests/sanity/state_parts_dump_check.py @@ -91,14 +91,6 @@ def main(): val for metric, val in metrics.items() if 'state_sync_dump_check_process_is_up' in metric ]) == NUM_SHARDS, f"Dumper process missing for some shards. {metrics}" - assert sum([ - val for metric, val in metrics.items() - if 'state_sync_dump_check_num_parts_dumped' in metric - ]) == 0, f"No node was supposed to dump parts. {metrics}" - assert sum([ - val for metric, val in metrics.items() - if 'state_sync_dump_check_num_header_dumped' in metric - ]) == 0, f"No node was supposed to dump headers. {metrics}" # wait for 10 more blocks. list(islice(poll_blocks(boot_node), 10)) diff --git a/tools/mock-node/src/setup.rs b/tools/mock-node/src/setup.rs index e9e8ed06cec..8a7aa530bc3 100644 --- a/tools/mock-node/src/setup.rs +++ b/tools/mock-node/src/setup.rs @@ -277,192 +277,3 @@ pub fn setup_mock_node( MockNode { target_height, mock_peer, rpc_client } } - -#[cfg(test)] -mod tests { - use crate::setup::{setup_mock_node, MockNode}; - use crate::MockNetworkConfig; - use actix::{Actor, System}; - use futures::{future, FutureExt}; - use near_actix_test_utils::{run_actix, spawn_interruptible}; - use near_chain::{ChainStore, ChainStoreAccess}; - use near_chain_configs::{Genesis, NEAR_BASE}; - use near_client::{GetBlock, ProcessTxRequest}; - use near_crypto::{InMemorySigner, KeyType}; - use near_epoch_manager::{EpochManager, EpochManagerAdapter}; - use near_network::tcp; - use near_network::test_utils::{wait_or_timeout, WaitOrTimeoutActor}; - use near_o11y::testonly::init_integration_logger; - use near_o11y::WithSpanContextExt; - use near_primitives::transaction::SignedTransaction; - use near_primitives::types::ShardId; - use near_store::test_utils::gen_account_from_alphabet; - use nearcore::{load_test_config, start_with_config}; - use rand::thread_rng; - use std::ops::ControlFlow; - use std::sync::{Arc, RwLock}; - use std::time::Duration; - - // Test the basic mocknet setup. - // This test first starts a localnet with one validator node that generates 2 epochs of blocks - // to generate a chain history. - // Then start a mock network with this chain history and test that the client in the mock network can catch up these 2 epochs. - // The localnet needs to have state snapshots enabled. It copies state from - // one instance to another by using the state sync mechanism, which relies - // on the flat storage snapshots. - #[test] - fn test_mock_node_basic() { - init_integration_logger(); - - // first set up a network with only one validator and generate some blocks - let mut genesis = - Genesis::test(vec!["test0".parse().unwrap(), "test1".parse().unwrap()], 1); - let epoch_length = 50; - genesis.config.epoch_length = epoch_length; - let mut near_config = - load_test_config("test0", tcp::ListenerAddr::reserve_for_test(), genesis.clone()); - near_config.client_config.min_num_peers = 0; - near_config.config.store.state_snapshot_enabled = true; - near_config.config.tracked_shards = vec![ShardId::new(0)]; // Track all shards. - - let dir = tempfile::Builder::new().prefix("test0").tempdir().unwrap(); - let path1 = dir.path(); - run_actix(async move { - let nearcore::NearNode { view_client, client, .. } = - start_with_config(path1, near_config).expect("start_with_config"); - - let view_client1 = view_client; - let nonce = Arc::new(RwLock::new(10)); - WaitOrTimeoutActor::new( - Box::new(move |_ctx| { - let nonce = nonce.clone(); - let client1 = client.clone(); - let actor = view_client1.send(GetBlock::latest().with_span_context()); - let actor = actor.then(move |res| { - if let Ok(Ok(block)) = res { - let next_nonce = *nonce.read().unwrap(); - if next_nonce < 100 { - WaitOrTimeoutActor::new( - Box::new(move |_ctx| { - let signer0 = InMemorySigner::from_seed( - "test1".parse().unwrap(), - KeyType::ED25519, - "test1", - ); - let mut rng = thread_rng(); - let transaction = SignedTransaction::create_account( - next_nonce, - "test1".parse().unwrap(), - gen_account_from_alphabet(&mut rng, b"abcdefghijklmn"), - 5 * NEAR_BASE, - signer0.public_key.clone(), - &signer0.into(), - block.header.hash, - ); - spawn_interruptible( - client1 - .send( - ProcessTxRequest { - transaction, - is_forwarded: false, - check_only: false, - } - .with_span_context(), - ) - .then(move |_res| future::ready(())), - ); - }), - 100, - 30000, - ) - .start(); - *nonce.write().unwrap() = next_nonce + 1; - } - - // This is the flaky part. - // The node needs to stop as late into an epoch as - // possible without going over into the next epoch. - let expected_height = epoch_length * 3 - 5; - if block.header.height >= expected_height { - tracing::info!( - block_height = block.header.height, - expected_height, - "Time to stop" - ); - System::current().stop() - } - } - future::ready(()) - }); - spawn_interruptible(actor); - }), - // Keep this number low to ensure the node is stopped late in - // the epoch without going into the next epoch. - 100, - 60000, - ) - .start(); - }); - - // start the mock network to simulate a new node "test1" to sync up - // start the client at height 10 (end of the first epoch) - let dir1 = tempfile::Builder::new().prefix("test1").tempdir().unwrap(); - let mut near_config1 = load_test_config("", tcp::ListenerAddr::reserve_for_test(), genesis); - near_config1.client_config.min_num_peers = 1; - near_config1.client_config.tracked_shards = vec![ShardId::new(0)]; // Track all shards. - near_config1.config.store.state_snapshot_enabled = true; - let network_config = MockNetworkConfig::with_delay(Duration::from_millis(10)); - - let client_start_height = { - tracing::info!(target: "mock_node", store_path = ?dir.path(), "Opening the created store to get client_start_height"); - let store = near_store::NodeStorage::opener( - dir.path(), - near_config1.config.archive, - &near_config1.config.store, - None, - ) - .open() - .unwrap() - .get_hot_store(); - let epoch_manager = - EpochManager::new_arc_handle(store.clone(), &near_config1.genesis.config, None); - let chain_store = ChainStore::new( - store, - near_config1.genesis.config.genesis_height, - near_config1.client_config.save_trie_changes, - ); - let network_head_hash = chain_store.head().unwrap().last_block_hash; - let last_epoch_start_height = - epoch_manager.get_epoch_start_height(&network_head_hash).unwrap(); - tracing::info!(target: "mock_node", ?network_head_hash, last_epoch_start_height); - // Needs to be the last block of an epoch. - last_epoch_start_height - 1 - }; - tracing::info!(target: "mock_node", client_start_height); - - run_actix(async { - let MockNode { rpc_client, .. } = setup_mock_node( - dir1.path(), - dir.path(), - near_config1, - &network_config, - client_start_height, - None, - None, - false, - tcp::ListenerAddr::reserve_for_test(), - ); - wait_or_timeout(100, 60000, || async { - if let Ok(status) = rpc_client.status().await { - if status.sync_info.latest_block_height >= client_start_height { - System::current().stop(); - return ControlFlow::Break(()); - } - } - ControlFlow::Continue(()) - }) - .await - .unwrap(); - }) - } -} diff --git a/tools/state-viewer/src/state_parts.rs b/tools/state-viewer/src/state_parts.rs index 2fb868ccac8..5d887fee2f1 100644 --- a/tools/state-viewer/src/state_parts.rs +++ b/tools/state-viewer/src/state_parts.rs @@ -6,7 +6,6 @@ use near_client::sync::external::{ external_storage_location_directory, get_num_parts_from_filename, ExternalConnection, StateFileType, }; -use near_client::sync::state::get_epoch_start_sync_hash; use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; use near_epoch_manager::EpochManager; use near_primitives::challenge::PartialState; @@ -342,7 +341,13 @@ async fn load_state_parts( let epoch = chain.epoch_manager.get_epoch_info(&epoch_id).unwrap(); let sync_hash = get_any_block_hash_of_epoch(&epoch, chain); - let sync_hash = get_epoch_start_sync_hash(chain, &sync_hash).unwrap(); + let sync_hash = match chain.get_sync_hash(&sync_hash).unwrap() { + Some(h) => h, + None => { + tracing::warn!(target: "state-parts", ?epoch_id, "sync hash not yet known"); + return; + } + }; let state_header = chain.get_state_response_header(shard_id, sync_hash).unwrap(); let state_root = state_header.chunk_prev_state_root(); @@ -443,7 +448,13 @@ async fn dump_state_parts( let epoch_id = epoch_selection.to_epoch_id(store, chain); let epoch = chain.epoch_manager.get_epoch_info(&epoch_id).unwrap(); let sync_hash = get_any_block_hash_of_epoch(&epoch, chain); - let sync_hash = get_epoch_start_sync_hash(chain, &sync_hash).unwrap(); + let sync_hash = match chain.get_sync_hash(&sync_hash).unwrap() { + Some(h) => h, + None => { + tracing::warn!(target: "state-parts", ?epoch_id, "sync hash not yet known"); + return; + } + }; let sync_block_header = chain.get_block_header(&sync_hash).unwrap(); let sync_prev_header = chain.get_previous_header(&sync_block_header).unwrap(); let sync_prev_prev_hash = sync_prev_header.prev_hash(); @@ -545,7 +556,13 @@ fn read_state_header( let epoch = chain.epoch_manager.get_epoch_info(&epoch_id).unwrap(); let sync_hash = get_any_block_hash_of_epoch(&epoch, chain); - let sync_hash = get_epoch_start_sync_hash(chain, &sync_hash).unwrap(); + let sync_hash = match chain.get_sync_hash(&sync_hash).unwrap() { + Some(h) => h, + None => { + tracing::warn!(target: "state-parts", ?epoch_id, "sync hash not yet known"); + return; + } + }; let state_header = chain.chain_store().get_state_header(shard_id, sync_hash); tracing::info!(target: "state-parts", ?epoch_id, ?sync_hash, ?state_header);