From c87354da55ff41c0655a7ec388d1d36aebc3268f Mon Sep 17 00:00:00 2001 From: posvyatokum Date: Fri, 3 Nov 2023 09:40:17 +0000 Subject: [PATCH] feat(epoch-sync): update EpochSyncInfo (#10081) #10030 Changing EpochSyncInfo to contain all data outlined in https://docs.google.com/document/d/14Itc9Hs7ewTRmcGANid9UCaYcJzaLzzM7FsvYYIFKDY/edit#heading=h.oixofhftbu1p Blocking generation of epoch block hashes is a known problem. IMO this is not a problem for header sync. Still, during regular block processing it is not ideal to spend additional time reading something that should have been saved in memory during the epoch. I will change this part after I have an epoch sync node setup that allows for realistic delay estimation. In the meantime the only optimisation is to collect this information from BlockInfos rather than from BlockHeaders, as BlockInfos is a much smaller column. --------- Co-authored-by: nikurt <86772482+nikurt@users.noreply.github.com> --- chain/chain/Cargo.toml | 2 +- chain/chain/src/chain.rs | 122 +++++++++++++++++------ chain/chain/src/test_utils/kv_runtime.rs | 8 ++ chain/epoch-manager/src/adapter.rs | 18 ++++ chain/epoch-manager/src/lib.rs | 34 +++++++ core/primitives/src/epoch_manager.rs | 25 ++--- 6 files changed, 165 insertions(+), 44 deletions(-) diff --git a/chain/chain/Cargo.toml b/chain/chain/Cargo.toml index a1dfbe53f49..23a7b7f125f 100644 --- a/chain/chain/Cargo.toml +++ b/chain/chain/Cargo.toml @@ -49,7 +49,7 @@ byzantine_asserts = [] expensive_tests = [] test_features = [] no_cache = ["near-store/no_cache"] -new_epoch_sync = ["near-store/new_epoch_sync", "near-primitives/new_epoch_sync"] +new_epoch_sync = ["near-store/new_epoch_sync", "near-primitives/new_epoch_sync", "near-epoch-manager/new_epoch_sync"] protocol_feature_reject_blocks_with_outdated_protocol_version = [ "near-primitives/protocol_feature_reject_blocks_with_outdated_protocol_version", diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 57e1cf914ef..7ef4df7c9df 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -39,10 +39,7 @@ use near_primitives::challenge::{ }; use near_primitives::checked_feature; #[cfg(feature = "new_epoch_sync")] -use near_primitives::epoch_manager::{ - block_info::BlockInfo, - epoch_sync::{BlockHeaderPair, EpochSyncInfo}, -}; +use near_primitives::epoch_manager::{block_info::BlockInfo, epoch_sync::EpochSyncInfo}; use near_primitives::errors::EpochError; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::merkle::{ @@ -1964,11 +1961,11 @@ impl Chain { #[cfg(feature = "new_epoch_sync")] { - // At this point BlockInfo for this header should be in DB and in `epoch_manager`s cache. + // At this point BlockInfo for this header should be in DB and in `epoch_manager`s cache because of `add_validator_proposals` call. let block_info = self.epoch_manager.get_block_info(header.hash())?; let mut chain_update = self.chain_update(); - chain_update.save_epoch_sync_info(header.epoch_id(), header, &block_info)?; + chain_update.save_epoch_sync_info(&block_info)?; chain_update.commit()?; } } @@ -5500,9 +5497,9 @@ impl<'a> ChainUpdate<'a> { #[cfg(feature = "new_epoch_sync")] { - // BlockInfo should be already recorded in epoch_manager cache + // BlockInfo should be already recorded in epoch_manager cache because of `add_validator_proposals` call let block_info = self.epoch_manager.get_block_info(block.hash())?; - self.save_epoch_sync_info(block.header().epoch_id(), block.header(), &block_info)?; + self.save_epoch_sync_info(&block_info)?; } // Add validated block to the db, even if it's not the canonical fork. @@ -5883,19 +5880,15 @@ impl<'a> ChainUpdate<'a> { /// If the block is the last one in the epoch /// construct and record `EpochSyncInfo` to `self.chain_store_update`. #[cfg(feature = "new_epoch_sync")] - fn save_epoch_sync_info( - &mut self, - epoch_id: &EpochId, - last_block_header: &BlockHeader, - last_block_info: &BlockInfo, - ) -> Result<(), Error> { - if self.epoch_manager.is_next_block_epoch_start(last_block_header.hash())? { + fn save_epoch_sync_info(&mut self, last_block_info: &BlockInfo) -> Result<(), Error> { + let epoch_id = last_block_info.epoch_id(); + if self.epoch_manager.is_next_block_epoch_start(last_block_info.hash())? { let mut store_update = self.chain_store_update.store().store_update(); store_update .set_ser( DBCol::EpochSyncInfo, epoch_id.as_ref(), - &self.create_epoch_sync_info(last_block_header, last_block_info)?, + &self.create_epoch_sync_info(last_block_info)?, ) .map_err(EpochError::from)?; self.chain_store_update.merge(store_update); @@ -5903,9 +5896,14 @@ impl<'a> ChainUpdate<'a> { Ok(()) } - /// Create a pair of `BlockHeader`s necessary to create `BlockInfo` for `block_hash` + /// Create a pair of `BlockHeader`s necessary to create `BlockInfo` for `block_hash`: + /// - header for `block_hash` + /// - header for `last_final_block` of `block_hash` header #[cfg(feature = "new_epoch_sync")] - fn get_header_pair(&self, block_hash: &CryptoHash) -> Result { + fn get_header_pair( + &self, + block_hash: &CryptoHash, + ) -> Result<(BlockHeader, BlockHeader), Error> { let header = self.chain_store_update.get_block_header(block_hash)?; // `block_hash` can correspond to genesis block, for which there is no last final block recorded, // because `last_final_block` for genesis is `CryptoHash::default()` @@ -5919,25 +5917,87 @@ impl<'a> ChainUpdate<'a> { self.chain_store_update.get_block_header(header.last_final_block())? } }; - Ok(BlockHeaderPair { header, last_finalised_header }) + Ok((header, last_finalised_header)) } - /// Data that is necessary to prove Epoch in new Epoch Sync. + /// For epoch sync we need to save: + /// - first header of the epoch + /// - last header of the epoch + /// - prev last header of the epoch + /// - every header on chain from `last_final_block` to the end of the epoch + /// - (*) header of the `last_final_block` for each of previously mentioned headers + /// + /// Because headers may repeat between those points, we use one `HashMap` to store them indexed by hash. + /// + /// Headers not marked with (*) need to be saved on the syncing node. + /// Headers marked with (*) only needed for `EpochSyncInfo` validation. #[cfg(feature = "new_epoch_sync")] - fn create_epoch_sync_info( + fn get_epoch_sync_info_headers( &self, - last_block_header: &BlockHeader, last_block_info: &BlockInfo, - ) -> Result { - let last = self.get_header_pair(last_block_header.hash())?; - let prev_last = self.get_header_pair(last_block_header.prev_hash())?; - let first = self.get_header_pair(last_block_info.epoch_first_block())?; - let epoch_info = self.epoch_manager.get_epoch_info(last_block_info.epoch_id())?; + ) -> Result<(HashMap, HashSet), Error> { + let mut headers = HashMap::new(); + let mut headers_to_save = HashSet::new(); + + let mut add_header = |block_hash: &CryptoHash| -> Result<(), Error> { + let (header, last_finalised_header) = self.get_header_pair(block_hash)?; + headers.insert(*header.hash(), header); + headers.insert(*last_finalised_header.hash(), last_finalised_header); + headers_to_save.insert(*block_hash); + Ok(()) + }; + + add_header(last_block_info.epoch_first_block())?; + add_header(last_block_info.hash())?; + add_header(last_block_info.prev_hash())?; + + // If we didn't add `last_final_block_hash` yet, go down the chain until we find it. + if last_block_info.hash() != last_block_info.last_final_block_hash() + && last_block_info.prev_hash() != last_block_info.last_final_block_hash() + { + let mut current_header = + self.chain_store_update.get_block_header(last_block_info.prev_hash())?; + while current_header.hash() != last_block_info.last_final_block_hash() { + // This only should happen if BlockInfo data is incorrect. + // Without this assert same BlockInfo will cause infinite loop instead of crash with a message. + assert!( + current_header.height() > last_block_info.last_finalized_height(), + "Reached block at height {:?} with hash {:?} from {:?}", + current_header.height(), + current_header.hash(), + last_block_info + ); + + // current_header was already added, as we start from current_header = prev_header. + current_header = + self.chain_store_update.get_block_header(current_header.prev_hash())?; + add_header(current_header.hash())?; + } + } + + Ok((headers, headers_to_save)) + } + + /// Data that is necessary to prove Epoch in new Epoch Sync. + #[cfg(feature = "new_epoch_sync")] + fn create_epoch_sync_info(&self, last_block_info: &BlockInfo) -> Result { + let mut all_block_hashes = self.epoch_manager.get_all_epoch_hashes(last_block_info)?; + all_block_hashes.reverse(); + + let (headers, headers_to_save) = self.get_epoch_sync_info_headers(last_block_info)?; + + let epoch_id = last_block_info.epoch_id(); + let next_epoch_id = self.epoch_manager.get_next_epoch_id(last_block_info.hash())?; + let next_next_epoch_id = EpochId(*last_block_info.hash()); + Ok(EpochSyncInfo { - last, - prev_last, - first, - block_producers: epoch_info.validators_iter().collect(), + all_block_hashes, + headers, + headers_to_save, + epoch_info: (*self.epoch_manager.get_epoch_info(epoch_id)?).clone(), + next_epoch_info: (*self.epoch_manager.get_epoch_info(&next_epoch_id)?).clone(), + next_next_epoch_info: (*self.epoch_manager.get_epoch_info(&next_next_epoch_id)?) + .clone(), }) } } diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index 84ff449a541..1f39842852f 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -945,6 +945,14 @@ impl EpochManagerAdapter for MockEpochManager { let next_shard_layout = self.get_shard_layout(&next_epoch_id)?; Ok(shard_layout != next_shard_layout) } + + #[cfg(feature = "new_epoch_sync")] + fn get_all_epoch_hashes( + &self, + _last_block_info: &BlockInfo, + ) -> Result, EpochError> { + Ok(vec![]) + } } impl RuntimeAdapter for KeyValueRuntime { diff --git a/chain/epoch-manager/src/adapter.rs b/chain/epoch-manager/src/adapter.rs index a4674e33590..1b79eb2e96d 100644 --- a/chain/epoch-manager/src/adapter.rs +++ b/chain/epoch-manager/src/adapter.rs @@ -376,6 +376,15 @@ pub trait EpochManagerAdapter: Send + Sync { ) -> Result; fn will_shard_layout_change(&self, parent_hash: &CryptoHash) -> Result; + + /// Returns a vector of all hashes in the epoch ending with `last_block_info`. + /// Only return blocks on chain of `last_block_info`. + /// Hashes are returned in the order from the last block to the first block. + #[cfg(feature = "new_epoch_sync")] + fn get_all_epoch_hashes( + &self, + last_block_info: &BlockInfo, + ) -> Result, EpochError>; } impl EpochManagerAdapter for EpochManagerHandle { @@ -944,4 +953,13 @@ impl EpochManagerAdapter for EpochManagerHandle { let epoch_manager = self.read(); epoch_manager.will_shard_layout_change(parent_hash) } + + #[cfg(feature = "new_epoch_sync")] + fn get_all_epoch_hashes( + &self, + last_block_info: &BlockInfo, + ) -> Result, EpochError> { + let epoch_manager = self.read(); + epoch_manager.get_all_epoch_hashes(last_block_info) + } } diff --git a/chain/epoch-manager/src/lib.rs b/chain/epoch-manager/src/lib.rs index e9b6b5e1566..0093fa95247 100644 --- a/chain/epoch-manager/src/lib.rs +++ b/chain/epoch-manager/src/lib.rs @@ -1843,4 +1843,38 @@ impl EpochManager { Ok(None) } } + + #[cfg(feature = "new_epoch_sync")] + pub fn get_all_epoch_hashes( + &self, + last_block_info: &BlockInfo, + ) -> Result, EpochError> { + let _span = + tracing::debug_span!(target: "epoch_manager", "get_all_epoch_hashes", ?last_block_info) + .entered(); + + let mut result = vec![]; + let first_epoch_block_height = + self.get_block_info(last_block_info.epoch_first_block())?.height(); + let mut current_block_info = last_block_info.clone(); + while current_block_info.hash() != last_block_info.epoch_first_block() { + // Check that we didn't reach previous epoch. + // This only should happen if BlockInfo data is incorrect. + // Without this assert same BlockInfo will cause infinite loop instead of crash with a message. + assert!( + current_block_info.height() > first_epoch_block_height, + "Reached {:?} from {:?} when first epoch height is {:?}", + current_block_info, + last_block_info, + first_epoch_block_height + ); + + result.push(*current_block_info.hash()); + current_block_info = (*self.get_block_info(current_block_info.prev_hash())?).clone(); + } + // First block of an epoch is not covered by the while loop. + result.push(*current_block_info.hash()); + + Ok(result) + } } diff --git a/core/primitives/src/epoch_manager.rs b/core/primitives/src/epoch_manager.rs index 393a00e1cdc..8cc7343adf7 100644 --- a/core/primitives/src/epoch_manager.rs +++ b/core/primitives/src/epoch_manager.rs @@ -1232,22 +1232,23 @@ pub enum SlashState { #[cfg(feature = "new_epoch_sync")] pub mod epoch_sync { use crate::block_header::BlockHeader; - use crate::types::validator_stake::ValidatorStake; + use crate::epoch_manager::epoch_info::EpochInfo; use borsh::{BorshDeserialize, BorshSerialize}; - - #[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)] - pub struct BlockHeaderPair { - pub header: BlockHeader, - pub last_finalised_header: BlockHeader, - } + use near_primitives_core::hash::CryptoHash; + use std::collections::{HashMap, HashSet}; /// Struct to keep all the info that is transferred for one epoch during Epoch Sync. #[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)] pub struct EpochSyncInfo { - /// None is only used for corner case of the first epoch - pub first: BlockHeaderPair, - pub last: BlockHeaderPair, - pub prev_last: BlockHeaderPair, - pub block_producers: Vec, + /// All block hashes of this epoch. In order of production. + pub all_block_hashes: Vec, + /// All headers relevant to epoch sync. + /// Contains epoch headers that need to be saved + supporting headers needed for validation. + pub headers: HashMap, + /// Hashes of headers that need to be validated and saved. + pub headers_to_save: HashSet, + pub epoch_info: EpochInfo, + pub next_epoch_info: EpochInfo, + pub next_next_epoch_info: EpochInfo, } }