From 27476317c30596241ff30e936ea58e860b31fed7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Chuda=C5=9B?= <18039094+staffik@users.noreply.github.com> Date: Tue, 29 Oct 2024 13:28:43 +0100 Subject: [PATCH] feat(resharding): Resharding state mapping integration (#12269) Tracking issue: https://github.com/near/nearcore/issues/12050 Summary: - Refactor `ReshardingManager::process_memtrie_resharding_storage_update()` into `start_resharding()` as it is an entry point to start resharding. - Add `set_state_shard_uid_mapping()` and call it from `start_resharding()`. - Test that State mapping integrates with resharding flow in `resharding_v3.rs` testloop. Test would fail with `MissingTrieValue` for children shards if disabled `set_state_shard_uid_mapping`. Perhaps `test_resharding_v3_base` should be as minimal as possible and I should not test State mapping there. But for now I do not see much value in doing it differently. --- chain/chain/src/chain.rs | 2 +- chain/chain/src/resharding/event_type.rs | 6 + chain/chain/src/resharding/manager.rs | 110 ++++++++++++++---- core/store/src/adapter/trie_store.rs | 3 +- .../src/test_loop/tests/resharding_v3.rs | 49 +++++++- 5 files changed, 140 insertions(+), 30 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 1f011f7a1a6..50eefee8c6b 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -2029,7 +2029,7 @@ impl Chain { if need_storage_update { // TODO(resharding): consider adding to catchup flow. - self.resharding_manager.process_memtrie_resharding_storage_update( + self.resharding_manager.start_resharding( self.chain_store.store_update(), &block, shard_uid, diff --git a/chain/chain/src/resharding/event_type.rs b/chain/chain/src/resharding/event_type.rs index b3fb4f95a8d..909956cfac9 100644 --- a/chain/chain/src/resharding/event_type.rs +++ b/chain/chain/src/resharding/event_type.rs @@ -32,6 +32,12 @@ pub struct ReshardingSplitShardParams { pub prev_block_hash: CryptoHash, } +impl ReshardingSplitShardParams { + pub fn children_shards(&self) -> Vec { + vec![self.left_child_shard, self.right_child_shard] + } +} + impl ReshardingEventType { /// Takes as input a [ShardLayout] definition and deduces which kind of resharding operation /// must be performed. diff --git a/chain/chain/src/resharding/manager.rs b/chain/chain/src/resharding/manager.rs index 456ca99de01..0d3f5318ae5 100644 --- a/chain/chain/src/resharding/manager.rs +++ b/chain/chain/src/resharding/manager.rs @@ -1,6 +1,7 @@ +use std::io; use std::sync::Arc; -use super::event_type::ReshardingEventType; +use super::event_type::{ReshardingEventType, ReshardingSplitShardParams}; use super::types::ReshardingSender; use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageResharderController}; use crate::types::RuntimeAdapter; @@ -13,7 +14,7 @@ use near_primitives::challenge::PartialState; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout}; use near_primitives::types::chunk_extra::ChunkExtra; -use near_store::adapter::StoreUpdateAdapter; +use near_store::adapter::{StoreAdapter, StoreUpdateAdapter}; use near_store::trie::mem::resharding::RetainMode; use near_store::{DBCol, PartialStorage, ShardTries, ShardUId, Store}; @@ -49,13 +50,10 @@ impl ReshardingManager { Self { store, epoch_manager, resharding_config, flat_storage_resharder, resharding_handle } } - /// If shard layout changes after the given block, creates temporary - /// memtries for new shards to be able to process them in the next epoch. - /// Note this doesn't complete resharding, proper memtries are to be - /// created later. - pub fn process_memtrie_resharding_storage_update( + /// Trigger resharding if shard layout changes after the given block. + pub fn start_resharding( &mut self, - mut chain_store_update: ChainStoreUpdate, + chain_store_update: ChainStoreUpdate, block: &Block, shard_uid: ShardUId, tries: ShardTries, @@ -63,7 +61,7 @@ impl ReshardingManager { let block_hash = block.hash(); let block_height = block.header().height(); let _span = tracing::debug_span!( - target: "resharding", "process_memtrie_resharding_storage_update", + target: "resharding", "start_resharding", ?block_hash, block_height, ?shard_uid) .entered(); @@ -84,34 +82,102 @@ impl ReshardingManager { tracing::debug!(target: "resharding", ?next_shard_layout, "next shard layout is not v2, skipping"); return Ok(()); } + let resharding_event_type = ReshardingEventType::from_shard_layout(&next_shard_layout, *block_hash, *prev_hash)?; - let Some(ReshardingEventType::SplitShard(split_shard_event)) = resharding_event_type else { - tracing::debug!(target: "resharding", ?resharding_event_type, "resharding event type is not split shard, skipping"); - return Ok(()); + match resharding_event_type { + Some(ReshardingEventType::SplitShard(split_shard_event)) => { + self.split_shard( + chain_store_update, + block, + shard_uid, + tries, + split_shard_event, + next_shard_layout, + )?; + } + None => { + tracing::warn!(target: "resharding", ?resharding_event_type, "unsupported resharding event type, skipping"); + } }; + Ok(()) + } + + fn split_shard( + &mut self, + chain_store_update: ChainStoreUpdate, + block: &Block, + shard_uid: ShardUId, + tries: ShardTries, + split_shard_event: ReshardingSplitShardParams, + next_shard_layout: ShardLayout, + ) -> Result<(), Error> { if split_shard_event.parent_shard != shard_uid { let parent_shard = split_shard_event.parent_shard; - tracing::debug!(target: "resharding", ?parent_shard, "shard uid does not match event parent shard, skipping"); + tracing::debug!(target: "resharding", ?parent_shard, "ShardUId does not match event parent shard, skipping"); return Ok(()); } - // TODO(resharding): what if node doesn't have memtrie? just pause - // processing? - // TODO(resharding): fork handling. if epoch is finalized on different - // blocks, the second finalization will crash. - tries.freeze_mem_tries( + // Reshard the State column by setting ShardUId mapping from children to parent. + self.set_state_shard_uid_mapping(&split_shard_event)?; + + // Create temporary children memtries by freezing parent memtrie and referencing it. + self.process_memtrie_resharding_storage_update( + chain_store_update, + block, shard_uid, - vec![split_shard_event.left_child_shard, split_shard_event.right_child_shard], + tries, + split_shard_event.clone(), )?; // Trigger resharding of flat storage. self.flat_storage_resharder.start_resharding( - ReshardingEventType::SplitShard(split_shard_event.clone()), + ReshardingEventType::SplitShard(split_shard_event), &next_shard_layout, )?; - let chunk_extra = self.get_chunk_extra(block_hash, &shard_uid)?; + Ok(()) + } + + /// Store in the database the mapping of ShardUId from children to the parent shard, + /// so that subsequent accesses to the State will use the parent shard's UId as a prefix for the database key. + fn set_state_shard_uid_mapping( + &mut self, + split_shard_event: &ReshardingSplitShardParams, + ) -> io::Result<()> { + let mut store_update = self.store.trie_store().store_update(); + let parent_shard_uid = split_shard_event.parent_shard; + // TODO(reshardingV3) No need to set the mapping for children shards that we won't track just after resharding? + for child_shard_uid in split_shard_event.children_shards() { + store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid); + } + store_update.commit() + } + + /// Creates temporary memtries for new shards to be able to process them in the next epoch. + /// Note this doesn't complete memtries resharding, proper memtries are to be created later. + fn process_memtrie_resharding_storage_update( + &mut self, + mut chain_store_update: ChainStoreUpdate, + block: &Block, + parent_shard_uid: ShardUId, + tries: ShardTries, + split_shard_event: ReshardingSplitShardParams, + ) -> Result<(), Error> { + let block_hash = block.hash(); + let block_height = block.header().height(); + let _span = tracing::debug_span!( + target: "resharding", "process_memtrie_resharding_storage_update", + ?block_hash, block_height, ?parent_shard_uid) + .entered(); + + // TODO(resharding): what if node doesn't have memtrie? just pause + // processing? + // TODO(resharding): fork handling. if epoch is finalized on different + // blocks, the second finalization will crash. + tries.freeze_mem_tries(parent_shard_uid, split_shard_event.children_shards())?; + + let chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?; let boundary_account = split_shard_event.boundary_account; let mut trie_store_update = self.store.store_update(); @@ -126,7 +192,7 @@ impl ReshardingManager { "Memtrie not loaded. Cannot process memtrie resharding storage update for block {:?}, shard {:?}", block_hash, - shard_uid + parent_shard_uid, ); return Err(Error::Other("Memtrie not loaded".to_string())); }; diff --git a/core/store/src/adapter/trie_store.rs b/core/store/src/adapter/trie_store.rs index 2a98abb3bcb..e01a4232e4b 100644 --- a/core/store/src/adapter/trie_store.rs +++ b/core/store/src/adapter/trie_store.rs @@ -160,8 +160,7 @@ impl<'a> TrieStoreUpdateAdapter<'a> { /// Set the mapping from `child_shard_uid` to `parent_shard_uid`. /// Used by Resharding V3 for State mapping. - #[cfg(test)] - fn set_shard_uid_mapping(&mut self, child_shard_uid: ShardUId, parent_shard_uid: ShardUId) { + pub fn set_shard_uid_mapping(&mut self, child_shard_uid: ShardUId, parent_shard_uid: ShardUId) { self.store_update.set( DBCol::StateShardUIdMapping, child_shard_uid.to_bytes().as_ref(), diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index f2e7c65cf8b..fc335937d15 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -1,15 +1,20 @@ +use borsh::BorshDeserialize; use itertools::Itertools; use near_async::test_loop::data::TestLoopData; use near_async::time::Duration; +use near_chain::ChainStoreAccess; use near_chain_configs::test_genesis::TestGenesisBuilder; use near_client::Client; use near_o11y::testonly::init_test_logger; use near_primitives::epoch_manager::EpochConfigStore; -use near_primitives::shard_layout::ShardLayout; +use near_primitives::hash::CryptoHash; +use near_primitives::shard_layout::{account_id_to_shard_uid, ShardLayout}; use near_primitives::state_record::StateRecord; use near_primitives::types::{AccountId, ShardId}; use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION}; -use near_store::ShardUId; +use near_store::adapter::StoreAdapter; +use near_store::db::refcount::decode_value_with_rc; +use near_store::{DBCol, ShardUId}; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; @@ -59,6 +64,37 @@ fn print_and_assert_shard_accounts(client: &Client) { } } +/// Asserts that all parent shard State is accessible via parent and children shards. +fn check_state_shard_uid_mapping_after_resharding(client: &Client, parent_shard_uid: ShardUId) { + let tip = client.chain.head().unwrap(); + let epoch_id = tip.epoch_id; + let epoch_config = client.epoch_manager.get_epoch_config(&epoch_id).unwrap(); + let children_shard_uids = + epoch_config.shard_layout.get_children_shards_uids(parent_shard_uid.shard_id()).unwrap(); + assert_eq!(children_shard_uids.len(), 2); + + let store = client.chain.chain_store.store().trie_store(); + for kv in store.store().iter_raw_bytes(DBCol::State) { + let (key, value) = kv.unwrap(); + let shard_uid = ShardUId::try_from_slice(&key[0..8]).unwrap(); + // Just after resharding, no State data must be keyed using children ShardUIds. + assert!(!children_shard_uids.contains(&shard_uid)); + if shard_uid != parent_shard_uid { + continue; + } + let node_hash = CryptoHash::try_from_slice(&key[8..]).unwrap(); + let (value, _) = decode_value_with_rc(&value); + let parent_value = store.get(parent_shard_uid, &node_hash); + // Parent shard data must still be accessible using parent ShardUId. + assert_eq!(&parent_value.unwrap()[..], value.unwrap()); + // All parent shard data is available via both children shards. + for child_shard_uid in &children_shard_uids { + let child_value = store.get(*child_shard_uid, &node_hash); + assert_eq!(&child_value.unwrap()[..], value.unwrap()); + } + } +} + /// Base setup to check sanity of Resharding V3. /// TODO(#11881): add the following scenarios: /// - Nodes must not track all shards. State sync must succeed. @@ -100,7 +136,8 @@ fn test_resharding_v3_base(chunk_ranges_to_drop: HashMap bool { @@ -160,7 +199,7 @@ fn test_resharding_v3_base(chunk_ranges_to_drop: HashMap