Skip to content

Commit

Permalink
feat(state-sync): sync to the current epoch instead of the previous (#…
Browse files Browse the repository at this point in the history
…12102)

When a node processes a block that’s the first block of epoch T, and it
realizes that it will need to track shards that it doesn’t currently
track in epoch T+1, it syncs state to the end of epoch T-1 and then
applies chunks until it’s caught up. We want to change this so that it
syncs state to epoch T instead, so that the integration of state
sync/catchup and resharding will be simpler.

In this PR, this is done by keeping most of the state sync logic
unchanged, but changing the “sync_hash” that’s used to identify what
point in the chain we want to sync to. Before, “sync_hash” was set to
the first block of an epoch, and the existing state sync logic would
have us sync the state as of two chunks before this hash. So here we
change the sync hash to be the hash of the first block for which at
least two new chunks have been seen for each shard in its epoch. This
allows us to sync state to epoch T with minimal modifications, because
the old logic is still valid.

Note that this PR does not implement support for this new way of syncing
for nodes that have fallen several epochs behind the chain, rather than
nodes that need to catchup for an upcoming epoch. This can be done in a
future PR
  • Loading branch information
marcelo-gonzalez authored Oct 26, 2024
1 parent 10463b2 commit a871b9d
Show file tree
Hide file tree
Showing 28 changed files with 1,234 additions and 476 deletions.
11 changes: 11 additions & 0 deletions chain/chain/src/block_processing_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ pub(crate) const MAX_PROCESSING_BLOCKS: usize = 5;

/// Contains information from preprocessing a block
pub(crate) struct BlockPreprocessInfo {
/// This field has two related but actually different meanings. For the first block of an
/// epoch, this will be set to false if we need to download state for shards we'll track in
/// the future but don't track currently. This implies the first meaning, which is that if
/// this is true, then we are ready to apply all chunks and update flat state for shards
/// we'll track in this and the next epoch. This comes into play when we decide what ApplyChunksMode
/// to pass to Chain::apply_chunks_preprocessing().
/// The other meaning is that the catchup code should process this block. When the state sync sync_hash
/// is the first block of the epoch, these two meanings are the same. But if the sync_hash is moved forward
/// in order to sync the current epoch's state instead of last epoch's, this field being false no longer implies
/// that we want to apply this block during catchup, so some care is needed to ensure we start catchup at the right
/// point in Client::run_catchup()
pub(crate) is_caught_up: bool,
pub(crate) state_sync_info: Option<StateSyncInfo>,
pub(crate) incoming_receipts: HashMap<ShardId, Vec<ReceiptProof>>,
Expand Down
262 changes: 186 additions & 76 deletions chain/chain/src/chain.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions chain/chain/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl ChainStore {
let header = self.get_block_header(&sync_hash)?;
let prev_hash = *header.prev_hash();
let sync_height = header.height();
// TODO(current_epoch_state_sync): fix this when syncing to the current epoch's state
// The congestion control added a dependency on the prev block when
// applying chunks in a block. This means that we need to keep the
// blocks at sync hash, prev hash and prev prev hash. The heigh of that
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2561,7 +2561,7 @@ impl<'a> ChainStoreUpdate<'a> {
for state_sync_info in self.add_state_sync_infos.drain(..) {
store_update.set_ser(
DBCol::StateDlInfos,
state_sync_info.epoch_tail_hash.as_ref(),
state_sync_info.epoch_first_block().as_ref(),
&state_sync_info,
)?;
}
Expand Down
4 changes: 2 additions & 2 deletions chain/chain/src/store_validator/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,8 @@ pub(crate) fn state_sync_info_valid(
state_sync_info: &StateSyncInfo,
) -> Result<(), StoreValidatorError> {
check_discrepancy!(
state_sync_info.epoch_tail_hash,
*block_hash,
state_sync_info.epoch_first_block(),
block_hash,
"Invalid StateSyncInfo stored"
);
Ok(())
Expand Down
215 changes: 194 additions & 21 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ use near_primitives::merkle::{merklize, MerklePath, PartialMerkleTree};
use near_primitives::network::PeerId;
use near_primitives::receipt::Receipt;
use near_primitives::sharding::{
EncodedShardChunk, PartialEncodedChunk, ShardChunk, ShardChunkHeader,
EncodedShardChunk, PartialEncodedChunk, ShardChunk, ShardChunkHeader, StateSyncInfo,
StateSyncInfoV1,
};
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::chunk_extra::ChunkExtra;
Expand Down Expand Up @@ -104,6 +105,17 @@ pub enum AdvProduceBlocksMode {
OnlyValid,
}

/// The state associated with downloading state for a shard this node will track in the
/// future but does not currently.
pub struct CatchupState {
/// Manages downloading the state.
pub state_sync: StateSync,
/// Keeps track of state downloads, and gets passed to `state_sync`.
pub sync_status: StateSyncStatus,
/// Manages going back to apply chunks after state has been downloaded.
pub catchup: BlocksCatchUpState,
}

pub struct Client {
/// Adversarial controls - should be enabled only to test disruptive
/// behaviour on chain.
Expand Down Expand Up @@ -139,9 +151,12 @@ pub struct Client {
/// Approvals for which we do not have the block yet
pub pending_approvals:
lru::LruCache<ApprovalInner, HashMap<AccountId, (Approval, ApprovalType)>>,
/// A mapping from an epoch that we know needs to be state synced for some shards
/// to a tracker that will find an appropriate sync_hash for state sync to that epoch
catchup_tracker: HashMap<EpochId, NewChunkTracker>,
/// A mapping from a block for which a state sync is underway for the next epoch, and the object
/// storing the current status of the state sync and blocks catch up
pub catchup_state_syncs: HashMap<CryptoHash, (StateSync, StateSyncStatus, BlocksCatchUpState)>,
pub catchup_state_syncs: HashMap<CryptoHash, CatchupState>,
/// Keeps track of information needed to perform the initial Epoch Sync
pub epoch_sync: EpochSync,
/// Keeps track of syncing headers.
Expand Down Expand Up @@ -224,6 +239,102 @@ pub struct ProduceChunkResult {
pub transactions_storage_proof: Option<PartialState>,
}

/// This keeps track of the number of new chunks seen in each shard since the block that was passed to new()
/// This whole thing could be replaced with a much simpler function that just computes the number of new chunks
/// in each shard from scratch every time we call it, but in the unlikely and unfortunate case where a shard
/// hasn't had any chunks for a very long time, it would end up being a nontrivial inefficiency to do that
/// every time run_catchup() is called
pub struct NewChunkTracker {
last_checked_hash: CryptoHash,
last_checked_height: BlockHeight,
num_new_chunks: HashMap<ShardId, usize>,
sync_hash: Option<CryptoHash>,
}

impl NewChunkTracker {
fn new(
first_block_hash: CryptoHash,
first_block_height: BlockHeight,
shard_ids: &[ShardId],
) -> Self {
Self {
last_checked_hash: first_block_hash,
last_checked_height: first_block_height,
num_new_chunks: shard_ids.iter().map(|shard_id| (*shard_id, 0)).collect(),
sync_hash: None,
}
}

// TODO(current_epoch_sync_hash): refactor this and use the same logic in get_current_epoch_sync_hash(). Ideally
// that function should go away (at least as it is now) in favor of a more efficient approach that we can call on
// every new block application
fn record_new_chunks(
&mut self,
epoch_manager: &dyn EpochManagerAdapter,
header: &BlockHeader,
) -> Result<bool, Error> {
let shard_layout = epoch_manager.get_shard_layout(header.epoch_id())?;

let mut done = true;
for (shard_id, num_new_chunks) in self.num_new_chunks.iter_mut() {
let shard_index = shard_layout.get_shard_index(*shard_id);
let Some(included) = header.chunk_mask().get(shard_index) else {
return Err(Error::Other(format!(
"can't get shard {} in chunk mask for block {}",
shard_id,
header.hash()
)));
};
if *included {
*num_new_chunks += 1;
}
if *num_new_chunks < 2 {
done = false;
}
}
self.last_checked_hash = *header.hash();
self.last_checked_height = header.height();
Ok(done)
}

fn find_sync_hash(
&mut self,
chain: &Chain,
epoch_manager: &dyn EpochManagerAdapter,
) -> Result<Option<CryptoHash>, Error> {
if let Some(sync_hash) = self.sync_hash {
return Ok(Some(sync_hash));
}

let final_head = chain.final_head()?;

while self.last_checked_height < final_head.height {
let next_hash = match chain.chain_store().get_next_block_hash(&self.last_checked_hash) {
Ok(h) => h,
Err(near_chain_primitives::Error::DBNotFoundErr(_)) => {
return Err(Error::Other(format!(
"final head is #{} {} but get_next_block_hash(#{} {}) is not found",
final_head.height,
final_head.last_block_hash,
self.last_checked_height,
&self.last_checked_hash
)));
}
Err(e) => return Err(e.into()),
};
let next_header = chain.get_block_header(&next_hash)?;
let done = self.record_new_chunks(epoch_manager, &next_header)?;
if done {
// TODO(current_epoch_state_sync): check to make sure the epoch IDs are the same. If there are no new chunks in some shard in the epoch,
// this will be for an epoch ahead of this one
self.sync_hash = Some(next_hash);
break;
}
}
Ok(self.sync_hash)
}
}

impl Client {
pub fn new(
clock: Clock,
Expand Down Expand Up @@ -371,6 +482,7 @@ impl Client {
pending_approvals: lru::LruCache::new(
NonZeroUsize::new(num_block_producer_seats).unwrap(),
),
catchup_tracker: HashMap::new(),
catchup_state_syncs: HashMap::new(),
epoch_sync,
header_sync,
Expand Down Expand Up @@ -2458,6 +2570,57 @@ impl Client {
Ok(false)
}

/// Find the sync hash. Most of the time it will already be set in `state_sync_info`. If not, try to find it,
/// and set the corresponding field in `state_sync_info`.
fn get_catchup_sync_hash_v1(
&mut self,
state_sync_info: &mut StateSyncInfoV1,
epoch_first_block: &BlockHeader,
) -> Result<Option<CryptoHash>, Error> {
if state_sync_info.sync_hash.is_some() {
return Ok(state_sync_info.sync_hash);
}

let new_chunk_tracker = match self.catchup_tracker.entry(*epoch_first_block.epoch_id()) {
std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
std::collections::hash_map::Entry::Vacant(e) => {
let shard_ids = self.epoch_manager.shard_ids(epoch_first_block.epoch_id())?;
e.insert(NewChunkTracker::new(
*epoch_first_block.hash(),
epoch_first_block.height(),
&shard_ids,
))
}
};

if let Some(sync_hash) =
new_chunk_tracker.find_sync_hash(&self.chain, self.epoch_manager.as_ref())?
{
state_sync_info.sync_hash = Some(sync_hash);
let mut update = self.chain.mut_chain_store().store_update();
// note that iterate_state_sync_infos() collects everything into a Vec, so we're not
// actually writing to the DB while actively iterating this column
update.add_state_sync_info(StateSyncInfo::V1(state_sync_info.clone()));
// TODO: would be nice to be able to propagate context up the call stack so we can just log
// once at the top with all the info. Otherwise this error will look very cryptic
update.commit()?;
}
Ok(state_sync_info.sync_hash)
}

/// Find the sync hash. If syncing to the old epoch's state, it's always set. If syncing to
/// the current epoch's state, it might not yet be known, in which case we try to find it.
fn get_catchup_sync_hash(
&mut self,
state_sync_info: &mut StateSyncInfo,
epoch_first_block: &BlockHeader,
) -> Result<Option<CryptoHash>, Error> {
match state_sync_info {
StateSyncInfo::V0(info) => Ok(Some(info.sync_hash)),
StateSyncInfo::V1(info) => self.get_catchup_sync_hash_v1(info, epoch_first_block),
}
}

/// Walks through all the ongoing state syncs for future epochs and processes them
pub fn run_catchup(
&mut self,
Expand All @@ -2469,17 +2632,27 @@ impl Client {
let _span = debug_span!(target: "sync", "run_catchup").entered();
let me = signer.as_ref().map(|x| x.validator_id().clone());

for (sync_hash, state_sync_info) in self.chain.chain_store().iterate_state_sync_infos()? {
assert_eq!(sync_hash, state_sync_info.epoch_tail_hash);
for (epoch_first_block, mut state_sync_info) in
self.chain.chain_store().iterate_state_sync_infos()?
{
assert_eq!(&epoch_first_block, state_sync_info.epoch_first_block());

let state_sync_timeout = self.config.state_sync_timeout;
let block_header = self.chain.get_block(&sync_hash)?.header().clone();
let block_header = self.chain.get_block(&epoch_first_block)?.header().clone();
let epoch_id = block_header.epoch_id();

let (state_sync, status, blocks_catch_up_state) =
self.catchup_state_syncs.entry(sync_hash).or_insert_with(|| {
tracing::debug!(target: "client", ?sync_hash, "inserting new state sync");
(
StateSync::new(
let sync_hash = match self.get_catchup_sync_hash(&mut state_sync_info, &block_header)? {
Some(h) => h,
None => continue,
};

let CatchupState { state_sync, sync_status: status, catchup } = self
.catchup_state_syncs
.entry(sync_hash)
.or_insert_with(|| {
tracing::debug!(target: "client", ?epoch_first_block, ?sync_hash, "inserting new state sync");
CatchupState {
state_sync: StateSync::new(
self.clock.clone(),
self.runtime_adapter.store().clone(),
self.epoch_manager.clone(),
Expand All @@ -2492,21 +2665,20 @@ impl Client {
self.state_sync_future_spawner.clone(),
true,
),
StateSyncStatus {
sync_status: StateSyncStatus {
sync_hash,
sync_status: HashMap::new(),
download_tasks: Vec::new(),
computation_tasks: Vec::new(),
},
BlocksCatchUpState::new(sync_hash, *epoch_id),
)
catchup: BlocksCatchUpState::new(sync_hash, *epoch_id),
}
});

// For colour decorators to work, they need to printed directly. Otherwise the decorators get escaped, garble output and don't add colours.
debug!(target: "catchup", ?me, ?sync_hash, progress_per_shard = ?status.sync_status, "Catchup");

let tracking_shards: Vec<ShardId> =
state_sync_info.shards.iter().map(|tuple| tuple.0).collect();
state_sync_info.shards().iter().map(|tuple| tuple.0).collect();

// Initialize the new shard sync to contain the shards to split at
// first. It will get updated with the shard sync download status
Expand All @@ -2518,19 +2690,20 @@ impl Client {
self.chain.catchup_blocks_step(
&me,
&sync_hash,
blocks_catch_up_state,
catchup,
block_catch_up_task_scheduler,
)?;

if blocks_catch_up_state.is_finished() {
if catchup.is_finished() {
let mut block_processing_artifacts = BlockProcessingArtifact::default();

self.chain.finish_catchup_blocks(
&me,
&epoch_first_block,
&sync_hash,
&mut block_processing_artifacts,
apply_chunks_done_sender.clone(),
&blocks_catch_up_state.done_blocks,
&catchup.done_blocks,
)?;

self.process_block_processing_artifact(block_processing_artifacts, &signer);
Expand Down Expand Up @@ -2716,11 +2889,11 @@ impl Client {
impl Client {
pub fn get_catchup_status(&self) -> Result<Vec<CatchupStatusView>, near_chain::Error> {
let mut ret = vec![];
for (sync_hash, (_, shard_sync_state, block_catchup_state)) in
for (sync_hash, CatchupState { sync_status, catchup, .. }) in
self.catchup_state_syncs.iter()
{
let sync_block_height = self.chain.get_block_header(sync_hash)?.height();
let shard_sync_status: HashMap<_, _> = shard_sync_state
let shard_sync_status: HashMap<_, _> = sync_status
.sync_status
.iter()
.map(|(shard_id, state)| (*shard_id, state.to_string()))
Expand All @@ -2729,7 +2902,7 @@ impl Client {
sync_block_hash: *sync_hash,
sync_block_height,
shard_sync_status,
blocks_to_catchup: self.chain.get_block_catchup_status(block_catchup_state),
blocks_to_catchup: self.chain.get_block_catchup_status(catchup),
});
}
Ok(ret)
Expand Down
Loading

0 comments on commit a871b9d

Please sign in to comment.