Skip to content

Commit

Permalink
feat(resharding): Resharding state mapping integration (#12269)
Browse files Browse the repository at this point in the history
Tracking issue: #12050

Summary:
- Refactor
`ReshardingManager::process_memtrie_resharding_storage_update()` into
`start_resharding()` as it is an entry point to start resharding.
- Add `set_state_shard_uid_mapping()` and call it from
`start_resharding()`.
- Test that State mapping integrates with resharding flow in
`resharding_v3.rs` testloop.

Test would fail with `MissingTrieValue` for children shards if disabled
`set_state_shard_uid_mapping`.

Perhaps `test_resharding_v3_base` should be as minimal as possible and I
should not test State mapping there. But for now I do not see much value
in doing it differently.
  • Loading branch information
staffik authored Oct 29, 2024
1 parent 598a2f4 commit 2747631
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 30 deletions.
2 changes: 1 addition & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2029,7 +2029,7 @@ impl Chain {

if need_storage_update {
// TODO(resharding): consider adding to catchup flow.
self.resharding_manager.process_memtrie_resharding_storage_update(
self.resharding_manager.start_resharding(
self.chain_store.store_update(),
&block,
shard_uid,
Expand Down
6 changes: 6 additions & 0 deletions chain/chain/src/resharding/event_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ pub struct ReshardingSplitShardParams {
pub prev_block_hash: CryptoHash,
}

impl ReshardingSplitShardParams {
pub fn children_shards(&self) -> Vec<ShardUId> {
vec![self.left_child_shard, self.right_child_shard]
}
}

impl ReshardingEventType {
/// Takes as input a [ShardLayout] definition and deduces which kind of resharding operation
/// must be performed.
Expand Down
110 changes: 88 additions & 22 deletions chain/chain/src/resharding/manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io;
use std::sync::Arc;

use super::event_type::ReshardingEventType;
use super::event_type::{ReshardingEventType, ReshardingSplitShardParams};
use super::types::ReshardingSender;
use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageResharderController};
use crate::types::RuntimeAdapter;
Expand All @@ -13,7 +14,7 @@ use near_primitives::challenge::PartialState;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout};
use near_primitives::types::chunk_extra::ChunkExtra;
use near_store::adapter::StoreUpdateAdapter;
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::trie::mem::resharding::RetainMode;
use near_store::{DBCol, PartialStorage, ShardTries, ShardUId, Store};

Expand Down Expand Up @@ -49,21 +50,18 @@ impl ReshardingManager {
Self { store, epoch_manager, resharding_config, flat_storage_resharder, resharding_handle }
}

/// If shard layout changes after the given block, creates temporary
/// memtries for new shards to be able to process them in the next epoch.
/// Note this doesn't complete resharding, proper memtries are to be
/// created later.
pub fn process_memtrie_resharding_storage_update(
/// Trigger resharding if shard layout changes after the given block.
pub fn start_resharding(
&mut self,
mut chain_store_update: ChainStoreUpdate,
chain_store_update: ChainStoreUpdate,
block: &Block,
shard_uid: ShardUId,
tries: ShardTries,
) -> Result<(), Error> {
let block_hash = block.hash();
let block_height = block.header().height();
let _span = tracing::debug_span!(
target: "resharding", "process_memtrie_resharding_storage_update",
target: "resharding", "start_resharding",
?block_hash, block_height, ?shard_uid)
.entered();

Expand All @@ -84,34 +82,102 @@ impl ReshardingManager {
tracing::debug!(target: "resharding", ?next_shard_layout, "next shard layout is not v2, skipping");
return Ok(());
}

let resharding_event_type =
ReshardingEventType::from_shard_layout(&next_shard_layout, *block_hash, *prev_hash)?;
let Some(ReshardingEventType::SplitShard(split_shard_event)) = resharding_event_type else {
tracing::debug!(target: "resharding", ?resharding_event_type, "resharding event type is not split shard, skipping");
return Ok(());
match resharding_event_type {
Some(ReshardingEventType::SplitShard(split_shard_event)) => {
self.split_shard(
chain_store_update,
block,
shard_uid,
tries,
split_shard_event,
next_shard_layout,
)?;
}
None => {
tracing::warn!(target: "resharding", ?resharding_event_type, "unsupported resharding event type, skipping");
}
};
Ok(())
}

fn split_shard(
&mut self,
chain_store_update: ChainStoreUpdate,
block: &Block,
shard_uid: ShardUId,
tries: ShardTries,
split_shard_event: ReshardingSplitShardParams,
next_shard_layout: ShardLayout,
) -> Result<(), Error> {
if split_shard_event.parent_shard != shard_uid {
let parent_shard = split_shard_event.parent_shard;
tracing::debug!(target: "resharding", ?parent_shard, "shard uid does not match event parent shard, skipping");
tracing::debug!(target: "resharding", ?parent_shard, "ShardUId does not match event parent shard, skipping");
return Ok(());
}

// TODO(resharding): what if node doesn't have memtrie? just pause
// processing?
// TODO(resharding): fork handling. if epoch is finalized on different
// blocks, the second finalization will crash.
tries.freeze_mem_tries(
// Reshard the State column by setting ShardUId mapping from children to parent.
self.set_state_shard_uid_mapping(&split_shard_event)?;

// Create temporary children memtries by freezing parent memtrie and referencing it.
self.process_memtrie_resharding_storage_update(
chain_store_update,
block,
shard_uid,
vec![split_shard_event.left_child_shard, split_shard_event.right_child_shard],
tries,
split_shard_event.clone(),
)?;

// Trigger resharding of flat storage.
self.flat_storage_resharder.start_resharding(
ReshardingEventType::SplitShard(split_shard_event.clone()),
ReshardingEventType::SplitShard(split_shard_event),
&next_shard_layout,
)?;

let chunk_extra = self.get_chunk_extra(block_hash, &shard_uid)?;
Ok(())
}

/// Store in the database the mapping of ShardUId from children to the parent shard,
/// so that subsequent accesses to the State will use the parent shard's UId as a prefix for the database key.
fn set_state_shard_uid_mapping(
&mut self,
split_shard_event: &ReshardingSplitShardParams,
) -> io::Result<()> {
let mut store_update = self.store.trie_store().store_update();
let parent_shard_uid = split_shard_event.parent_shard;
// TODO(reshardingV3) No need to set the mapping for children shards that we won't track just after resharding?
for child_shard_uid in split_shard_event.children_shards() {
store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid);
}
store_update.commit()
}

/// Creates temporary memtries for new shards to be able to process them in the next epoch.
/// Note this doesn't complete memtries resharding, proper memtries are to be created later.
fn process_memtrie_resharding_storage_update(
&mut self,
mut chain_store_update: ChainStoreUpdate,
block: &Block,
parent_shard_uid: ShardUId,
tries: ShardTries,
split_shard_event: ReshardingSplitShardParams,
) -> Result<(), Error> {
let block_hash = block.hash();
let block_height = block.header().height();
let _span = tracing::debug_span!(
target: "resharding", "process_memtrie_resharding_storage_update",
?block_hash, block_height, ?parent_shard_uid)
.entered();

// TODO(resharding): what if node doesn't have memtrie? just pause
// processing?
// TODO(resharding): fork handling. if epoch is finalized on different
// blocks, the second finalization will crash.
tries.freeze_mem_tries(parent_shard_uid, split_shard_event.children_shards())?;

let chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?;
let boundary_account = split_shard_event.boundary_account;

let mut trie_store_update = self.store.store_update();
Expand All @@ -126,7 +192,7 @@ impl ReshardingManager {
"Memtrie not loaded. Cannot process memtrie resharding storage
update for block {:?}, shard {:?}",
block_hash,
shard_uid
parent_shard_uid,
);
return Err(Error::Other("Memtrie not loaded".to_string()));
};
Expand Down
3 changes: 1 addition & 2 deletions core/store/src/adapter/trie_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ impl<'a> TrieStoreUpdateAdapter<'a> {

/// Set the mapping from `child_shard_uid` to `parent_shard_uid`.
/// Used by Resharding V3 for State mapping.
#[cfg(test)]
fn set_shard_uid_mapping(&mut self, child_shard_uid: ShardUId, parent_shard_uid: ShardUId) {
pub fn set_shard_uid_mapping(&mut self, child_shard_uid: ShardUId, parent_shard_uid: ShardUId) {
self.store_update.set(
DBCol::StateShardUIdMapping,
child_shard_uid.to_bytes().as_ref(),
Expand Down
49 changes: 44 additions & 5 deletions integration-tests/src/test_loop/tests/resharding_v3.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use borsh::BorshDeserialize;
use itertools::Itertools;
use near_async::test_loop::data::TestLoopData;
use near_async::time::Duration;
use near_chain::ChainStoreAccess;
use near_chain_configs::test_genesis::TestGenesisBuilder;
use near_client::Client;
use near_o11y::testonly::init_test_logger;
use near_primitives::epoch_manager::EpochConfigStore;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{account_id_to_shard_uid, ShardLayout};
use near_primitives::state_record::StateRecord;
use near_primitives::types::{AccountId, ShardId};
use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION};
use near_store::ShardUId;
use near_store::adapter::StoreAdapter;
use near_store::db::refcount::decode_value_with_rc;
use near_store::{DBCol, ShardUId};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

Expand Down Expand Up @@ -59,6 +64,37 @@ fn print_and_assert_shard_accounts(client: &Client) {
}
}

/// Asserts that all parent shard State is accessible via parent and children shards.
fn check_state_shard_uid_mapping_after_resharding(client: &Client, parent_shard_uid: ShardUId) {
let tip = client.chain.head().unwrap();
let epoch_id = tip.epoch_id;
let epoch_config = client.epoch_manager.get_epoch_config(&epoch_id).unwrap();
let children_shard_uids =
epoch_config.shard_layout.get_children_shards_uids(parent_shard_uid.shard_id()).unwrap();
assert_eq!(children_shard_uids.len(), 2);

let store = client.chain.chain_store.store().trie_store();
for kv in store.store().iter_raw_bytes(DBCol::State) {
let (key, value) = kv.unwrap();
let shard_uid = ShardUId::try_from_slice(&key[0..8]).unwrap();
// Just after resharding, no State data must be keyed using children ShardUIds.
assert!(!children_shard_uids.contains(&shard_uid));
if shard_uid != parent_shard_uid {
continue;
}
let node_hash = CryptoHash::try_from_slice(&key[8..]).unwrap();
let (value, _) = decode_value_with_rc(&value);
let parent_value = store.get(parent_shard_uid, &node_hash);
// Parent shard data must still be accessible using parent ShardUId.
assert_eq!(&parent_value.unwrap()[..], value.unwrap());
// All parent shard data is available via both children shards.
for child_shard_uid in &children_shard_uids {
let child_value = store.get(*child_shard_uid, &node_hash);
assert_eq!(&child_value.unwrap()[..], value.unwrap());
}
}
}

/// Base setup to check sanity of Resharding V3.
/// TODO(#11881): add the following scenarios:
/// - Nodes must not track all shards. State sync must succeed.
Expand Down Expand Up @@ -100,7 +136,8 @@ fn test_resharding_v3_base(chunk_ranges_to_drop: HashMap<ShardUId, std::ops::Ran
base_epoch_config.shard_layout = ShardLayout::v1(vec!["account3".parse().unwrap()], None, 3);
let base_shard_layout = base_epoch_config.shard_layout.clone();
let mut epoch_config = base_epoch_config.clone();
epoch_config.shard_layout = create_new_shard_layout(&base_shard_layout, "account6");
let boundary_account = "account6";
epoch_config.shard_layout = create_new_shard_layout(&base_shard_layout, boundary_account);
let expected_num_shards = epoch_config.shard_layout.shard_ids().count();
let epoch_config_store = EpochConfigStore::test(BTreeMap::from_iter(vec![
(base_protocol_version, Arc::new(base_epoch_config)),
Expand All @@ -110,7 +147,7 @@ fn test_resharding_v3_base(chunk_ranges_to_drop: HashMap<ShardUId, std::ops::Ran
let mut genesis_builder = TestGenesisBuilder::new();
genesis_builder
.genesis_time_from_clock(&builder.clock())
.shard_layout(base_shard_layout)
.shard_layout(base_shard_layout.clone())
.protocol_version(base_protocol_version)
.epoch_length(epoch_length)
.validators_desired_roles(&block_and_chunk_producers, &[]);
Expand All @@ -127,6 +164,8 @@ fn test_resharding_v3_base(chunk_ranges_to_drop: HashMap<ShardUId, std::ops::Ran
.track_all_shards()
.build();

let boundary_account_id: AccountId = boundary_account.parse().unwrap();
let parent_shard_uid = account_id_to_shard_uid(&boundary_account_id, &base_shard_layout);
let client_handle = node_datas[0].client_sender.actor_handle();
let latest_block_height = std::cell::Cell::new(0u64);
let success_condition = |test_loop_data: &mut TestLoopData| -> bool {
Expand Down Expand Up @@ -160,7 +199,7 @@ fn test_resharding_v3_base(chunk_ranges_to_drop: HashMap<ShardUId, std::ops::Ran

println!("State after resharding:");
print_and_assert_shard_accounts(client);

check_state_shard_uid_mapping_after_resharding(&client, parent_shard_uid);
return true;
};

Expand Down

0 comments on commit 2747631

Please sign in to comment.