Skip to content

Commit

Permalink
feat(epoch-sync): update EpochSyncInfo (#10081)
Browse files Browse the repository at this point in the history
#10030 

Changing EpochSyncInfo to contain all data outlined in
https://docs.google.com/document/d/14Itc9Hs7ewTRmcGANid9UCaYcJzaLzzM7FsvYYIFKDY/edit#heading=h.oixofhftbu1p

Blocking generation of epoch block hashes is a known problem.
IMO this is not a problem for header sync. Still, during regular block
processing it is not ideal to spend additional time reading something
that should have been saved in memory during the epoch.
I will change this part after I have an epoch sync node setup that
allows for realistic delay estimation.
In the meantime the only optimisation is to collect this information
from BlockInfos rather than from BlockHeaders, as BlockInfos is a much
smaller column.

---------

Co-authored-by: nikurt <[email protected]>
  • Loading branch information
posvyatokum and nikurt authored Nov 3, 2023
1 parent 2fe0779 commit c87354d
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 44 deletions.
2 changes: 1 addition & 1 deletion chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ byzantine_asserts = []
expensive_tests = []
test_features = []
no_cache = ["near-store/no_cache"]
new_epoch_sync = ["near-store/new_epoch_sync", "near-primitives/new_epoch_sync"]
new_epoch_sync = ["near-store/new_epoch_sync", "near-primitives/new_epoch_sync", "near-epoch-manager/new_epoch_sync"]

protocol_feature_reject_blocks_with_outdated_protocol_version = [
"near-primitives/protocol_feature_reject_blocks_with_outdated_protocol_version",
Expand Down
122 changes: 91 additions & 31 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ use near_primitives::challenge::{
};
use near_primitives::checked_feature;
#[cfg(feature = "new_epoch_sync")]
use near_primitives::epoch_manager::{
block_info::BlockInfo,
epoch_sync::{BlockHeaderPair, EpochSyncInfo},
};
use near_primitives::epoch_manager::{block_info::BlockInfo, epoch_sync::EpochSyncInfo};
use near_primitives::errors::EpochError;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::{
Expand Down Expand Up @@ -1964,11 +1961,11 @@ impl Chain {

#[cfg(feature = "new_epoch_sync")]
{
// At this point BlockInfo for this header should be in DB and in `epoch_manager`s cache.
// At this point BlockInfo for this header should be in DB and in `epoch_manager`s cache because of `add_validator_proposals` call.
let block_info = self.epoch_manager.get_block_info(header.hash())?;

let mut chain_update = self.chain_update();
chain_update.save_epoch_sync_info(header.epoch_id(), header, &block_info)?;
chain_update.save_epoch_sync_info(&block_info)?;
chain_update.commit()?;
}
}
Expand Down Expand Up @@ -5500,9 +5497,9 @@ impl<'a> ChainUpdate<'a> {

#[cfg(feature = "new_epoch_sync")]
{
// BlockInfo should be already recorded in epoch_manager cache
// BlockInfo should be already recorded in epoch_manager cache because of `add_validator_proposals` call
let block_info = self.epoch_manager.get_block_info(block.hash())?;
self.save_epoch_sync_info(block.header().epoch_id(), block.header(), &block_info)?;
self.save_epoch_sync_info(&block_info)?;
}

// Add validated block to the db, even if it's not the canonical fork.
Expand Down Expand Up @@ -5883,29 +5880,30 @@ impl<'a> ChainUpdate<'a> {
/// If the block is the last one in the epoch
/// construct and record `EpochSyncInfo` to `self.chain_store_update`.
#[cfg(feature = "new_epoch_sync")]
fn save_epoch_sync_info(
&mut self,
epoch_id: &EpochId,
last_block_header: &BlockHeader,
last_block_info: &BlockInfo,
) -> Result<(), Error> {
if self.epoch_manager.is_next_block_epoch_start(last_block_header.hash())? {
fn save_epoch_sync_info(&mut self, last_block_info: &BlockInfo) -> Result<(), Error> {
let epoch_id = last_block_info.epoch_id();
if self.epoch_manager.is_next_block_epoch_start(last_block_info.hash())? {
let mut store_update = self.chain_store_update.store().store_update();
store_update
.set_ser(
DBCol::EpochSyncInfo,
epoch_id.as_ref(),
&self.create_epoch_sync_info(last_block_header, last_block_info)?,
&self.create_epoch_sync_info(last_block_info)?,
)
.map_err(EpochError::from)?;
self.chain_store_update.merge(store_update);
}
Ok(())
}

/// Create a pair of `BlockHeader`s necessary to create `BlockInfo` for `block_hash`
/// Create a pair of `BlockHeader`s necessary to create `BlockInfo` for `block_hash`:
/// - header for `block_hash`
/// - header for `last_final_block` of `block_hash` header
#[cfg(feature = "new_epoch_sync")]
fn get_header_pair(&self, block_hash: &CryptoHash) -> Result<BlockHeaderPair, Error> {
fn get_header_pair(
&self,
block_hash: &CryptoHash,
) -> Result<(BlockHeader, BlockHeader), Error> {
let header = self.chain_store_update.get_block_header(block_hash)?;
// `block_hash` can correspond to genesis block, for which there is no last final block recorded,
// because `last_final_block` for genesis is `CryptoHash::default()`
Expand All @@ -5919,25 +5917,87 @@ impl<'a> ChainUpdate<'a> {
self.chain_store_update.get_block_header(header.last_final_block())?
}
};
Ok(BlockHeaderPair { header, last_finalised_header })
Ok((header, last_finalised_header))
}

/// Data that is necessary to prove Epoch in new Epoch Sync.
/// For epoch sync we need to save:
/// - first header of the epoch
/// - last header of the epoch
/// - prev last header of the epoch
/// - every header on chain from `last_final_block` to the end of the epoch
/// - (*) header of the `last_final_block` for each of previously mentioned headers
///
/// Because headers may repeat between those points, we use one `HashMap` to store them indexed by hash.
///
/// Headers not marked with (*) need to be saved on the syncing node.
/// Headers marked with (*) only needed for `EpochSyncInfo` validation.
#[cfg(feature = "new_epoch_sync")]
fn create_epoch_sync_info(
fn get_epoch_sync_info_headers(
&self,
last_block_header: &BlockHeader,
last_block_info: &BlockInfo,
) -> Result<EpochSyncInfo, Error> {
let last = self.get_header_pair(last_block_header.hash())?;
let prev_last = self.get_header_pair(last_block_header.prev_hash())?;
let first = self.get_header_pair(last_block_info.epoch_first_block())?;
let epoch_info = self.epoch_manager.get_epoch_info(last_block_info.epoch_id())?;
) -> Result<(HashMap<CryptoHash, BlockHeader>, HashSet<CryptoHash>), Error> {
let mut headers = HashMap::new();
let mut headers_to_save = HashSet::new();

let mut add_header = |block_hash: &CryptoHash| -> Result<(), Error> {
let (header, last_finalised_header) = self.get_header_pair(block_hash)?;
headers.insert(*header.hash(), header);
headers.insert(*last_finalised_header.hash(), last_finalised_header);
headers_to_save.insert(*block_hash);
Ok(())
};

add_header(last_block_info.epoch_first_block())?;
add_header(last_block_info.hash())?;
add_header(last_block_info.prev_hash())?;

// If we didn't add `last_final_block_hash` yet, go down the chain until we find it.
if last_block_info.hash() != last_block_info.last_final_block_hash()
&& last_block_info.prev_hash() != last_block_info.last_final_block_hash()
{
let mut current_header =
self.chain_store_update.get_block_header(last_block_info.prev_hash())?;
while current_header.hash() != last_block_info.last_final_block_hash() {
// This only should happen if BlockInfo data is incorrect.
// Without this assert same BlockInfo will cause infinite loop instead of crash with a message.
assert!(
current_header.height() > last_block_info.last_finalized_height(),
"Reached block at height {:?} with hash {:?} from {:?}",
current_header.height(),
current_header.hash(),
last_block_info
);

// current_header was already added, as we start from current_header = prev_header.
current_header =
self.chain_store_update.get_block_header(current_header.prev_hash())?;
add_header(current_header.hash())?;
}
}

Ok((headers, headers_to_save))
}

/// Data that is necessary to prove Epoch in new Epoch Sync.
#[cfg(feature = "new_epoch_sync")]
fn create_epoch_sync_info(&self, last_block_info: &BlockInfo) -> Result<EpochSyncInfo, Error> {
let mut all_block_hashes = self.epoch_manager.get_all_epoch_hashes(last_block_info)?;
all_block_hashes.reverse();

let (headers, headers_to_save) = self.get_epoch_sync_info_headers(last_block_info)?;

let epoch_id = last_block_info.epoch_id();
let next_epoch_id = self.epoch_manager.get_next_epoch_id(last_block_info.hash())?;
let next_next_epoch_id = EpochId(*last_block_info.hash());

Ok(EpochSyncInfo {
last,
prev_last,
first,
block_producers: epoch_info.validators_iter().collect(),
all_block_hashes,
headers,
headers_to_save,
epoch_info: (*self.epoch_manager.get_epoch_info(epoch_id)?).clone(),
next_epoch_info: (*self.epoch_manager.get_epoch_info(&next_epoch_id)?).clone(),
next_next_epoch_info: (*self.epoch_manager.get_epoch_info(&next_next_epoch_id)?)
.clone(),
})
}
}
Expand Down
8 changes: 8 additions & 0 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,14 @@ impl EpochManagerAdapter for MockEpochManager {
let next_shard_layout = self.get_shard_layout(&next_epoch_id)?;
Ok(shard_layout != next_shard_layout)
}

#[cfg(feature = "new_epoch_sync")]
fn get_all_epoch_hashes(
&self,
_last_block_info: &BlockInfo,
) -> Result<Vec<CryptoHash>, EpochError> {
Ok(vec![])
}
}

impl RuntimeAdapter for KeyValueRuntime {
Expand Down
18 changes: 18 additions & 0 deletions chain/epoch-manager/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,15 @@ pub trait EpochManagerAdapter: Send + Sync {
) -> Result<bool, EpochError>;

fn will_shard_layout_change(&self, parent_hash: &CryptoHash) -> Result<bool, EpochError>;

/// Returns a vector of all hashes in the epoch ending with `last_block_info`.
/// Only return blocks on chain of `last_block_info`.
/// Hashes are returned in the order from the last block to the first block.
#[cfg(feature = "new_epoch_sync")]
fn get_all_epoch_hashes(
&self,
last_block_info: &BlockInfo,
) -> Result<Vec<CryptoHash>, EpochError>;
}

impl EpochManagerAdapter for EpochManagerHandle {
Expand Down Expand Up @@ -944,4 +953,13 @@ impl EpochManagerAdapter for EpochManagerHandle {
let epoch_manager = self.read();
epoch_manager.will_shard_layout_change(parent_hash)
}

#[cfg(feature = "new_epoch_sync")]
fn get_all_epoch_hashes(
&self,
last_block_info: &BlockInfo,
) -> Result<Vec<CryptoHash>, EpochError> {
let epoch_manager = self.read();
epoch_manager.get_all_epoch_hashes(last_block_info)
}
}
34 changes: 34 additions & 0 deletions chain/epoch-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1843,4 +1843,38 @@ impl EpochManager {
Ok(None)
}
}

#[cfg(feature = "new_epoch_sync")]
pub fn get_all_epoch_hashes(
&self,
last_block_info: &BlockInfo,
) -> Result<Vec<CryptoHash>, EpochError> {
let _span =
tracing::debug_span!(target: "epoch_manager", "get_all_epoch_hashes", ?last_block_info)
.entered();

let mut result = vec![];
let first_epoch_block_height =
self.get_block_info(last_block_info.epoch_first_block())?.height();
let mut current_block_info = last_block_info.clone();
while current_block_info.hash() != last_block_info.epoch_first_block() {
// Check that we didn't reach previous epoch.
// This only should happen if BlockInfo data is incorrect.
// Without this assert same BlockInfo will cause infinite loop instead of crash with a message.
assert!(
current_block_info.height() > first_epoch_block_height,
"Reached {:?} from {:?} when first epoch height is {:?}",
current_block_info,
last_block_info,
first_epoch_block_height
);

result.push(*current_block_info.hash());
current_block_info = (*self.get_block_info(current_block_info.prev_hash())?).clone();
}
// First block of an epoch is not covered by the while loop.
result.push(*current_block_info.hash());

Ok(result)
}
}
25 changes: 13 additions & 12 deletions core/primitives/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1232,22 +1232,23 @@ pub enum SlashState {
#[cfg(feature = "new_epoch_sync")]
pub mod epoch_sync {
use crate::block_header::BlockHeader;
use crate::types::validator_stake::ValidatorStake;
use crate::epoch_manager::epoch_info::EpochInfo;
use borsh::{BorshDeserialize, BorshSerialize};

#[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)]
pub struct BlockHeaderPair {
pub header: BlockHeader,
pub last_finalised_header: BlockHeader,
}
use near_primitives_core::hash::CryptoHash;
use std::collections::{HashMap, HashSet};

/// Struct to keep all the info that is transferred for one epoch during Epoch Sync.
#[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)]
pub struct EpochSyncInfo {
/// None is only used for corner case of the first epoch
pub first: BlockHeaderPair,
pub last: BlockHeaderPair,
pub prev_last: BlockHeaderPair,
pub block_producers: Vec<ValidatorStake>,
/// All block hashes of this epoch. In order of production.
pub all_block_hashes: Vec<CryptoHash>,
/// All headers relevant to epoch sync.
/// Contains epoch headers that need to be saved + supporting headers needed for validation.
pub headers: HashMap<CryptoHash, BlockHeader>,
/// Hashes of headers that need to be validated and saved.
pub headers_to_save: HashSet<CryptoHash>,
pub epoch_info: EpochInfo,
pub next_epoch_info: EpochInfo,
pub next_next_epoch_info: EpochInfo,
}
}

0 comments on commit c87354d

Please sign in to comment.