Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(resharding): Resharding state mapping integration #12269

Merged
merged 6 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2029,7 +2029,7 @@ impl Chain {

if need_storage_update {
// TODO(resharding): consider adding to catchup flow.
self.resharding_manager.process_memtrie_resharding_storage_update(
self.resharding_manager.start_resharding(
self.chain_store.store_update(),
&block,
shard_uid,
Expand Down
6 changes: 6 additions & 0 deletions chain/chain/src/resharding/event_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ pub struct ReshardingSplitShardParams {
pub prev_block_hash: CryptoHash,
}

impl ReshardingSplitShardParams {
pub fn children_shards(&self) -> Vec<ShardUId> {
vec![self.left_child_shard, self.right_child_shard]
}
}

impl ReshardingEventType {
/// Takes as input a [ShardLayout] definition and deduces which kind of resharding operation
/// must be performed.
Expand Down
110 changes: 88 additions & 22 deletions chain/chain/src/resharding/manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::io;
use std::sync::Arc;

use super::event_type::ReshardingEventType;
use super::event_type::{ReshardingEventType, ReshardingSplitShardParams};
use super::types::ReshardingSender;
use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageResharderController};
use crate::types::RuntimeAdapter;
Expand All @@ -13,7 +14,7 @@ use near_primitives::challenge::PartialState;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{get_block_shard_uid, ShardLayout};
use near_primitives::types::chunk_extra::ChunkExtra;
use near_store::adapter::StoreUpdateAdapter;
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::trie::mem::resharding::RetainMode;
use near_store::{DBCol, PartialStorage, ShardTries, ShardUId, Store};

Expand Down Expand Up @@ -49,21 +50,18 @@ impl ReshardingManager {
Self { store, epoch_manager, resharding_config, flat_storage_resharder, resharding_handle }
}

/// If shard layout changes after the given block, creates temporary
/// memtries for new shards to be able to process them in the next epoch.
/// Note this doesn't complete resharding, proper memtries are to be
/// created later.
pub fn process_memtrie_resharding_storage_update(
/// Trigger resharding if shard layout changes after the given block.
pub fn start_resharding(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice... thanks for renaming!

&mut self,
mut chain_store_update: ChainStoreUpdate,
chain_store_update: ChainStoreUpdate,
block: &Block,
shard_uid: ShardUId,
tries: ShardTries,
) -> Result<(), Error> {
let block_hash = block.hash();
let block_height = block.header().height();
let _span = tracing::debug_span!(
target: "resharding", "process_memtrie_resharding_storage_update",
target: "resharding", "start_resharding",
?block_hash, block_height, ?shard_uid)
.entered();

Expand All @@ -84,34 +82,102 @@ impl ReshardingManager {
tracing::debug!(target: "resharding", ?next_shard_layout, "next shard layout is not v2, skipping");
return Ok(());
}

let resharding_event_type =
ReshardingEventType::from_shard_layout(&next_shard_layout, *block_hash, *prev_hash)?;
let Some(ReshardingEventType::SplitShard(split_shard_event)) = resharding_event_type else {
tracing::debug!(target: "resharding", ?resharding_event_type, "resharding event type is not split shard, skipping");
return Ok(());
match resharding_event_type {
Some(ReshardingEventType::SplitShard(split_shard_event)) => {
self.split_shard(
chain_store_update,
block,
shard_uid,
tries,
split_shard_event,
next_shard_layout,
)?;
}
None => {
tracing::warn!(target: "resharding", ?resharding_event_type, "unsupported resharding event type, skipping");
}
};
Ok(())
}

fn split_shard(
&mut self,
chain_store_update: ChainStoreUpdate,
block: &Block,
shard_uid: ShardUId,
tries: ShardTries,
split_shard_event: ReshardingSplitShardParams,
next_shard_layout: ShardLayout,
) -> Result<(), Error> {
if split_shard_event.parent_shard != shard_uid {
let parent_shard = split_shard_event.parent_shard;
tracing::debug!(target: "resharding", ?parent_shard, "shard uid does not match event parent shard, skipping");
tracing::debug!(target: "resharding", ?parent_shard, "ShardUId does not match event parent shard, skipping");
return Ok(());
}

// TODO(resharding): what if node doesn't have memtrie? just pause
// processing?
// TODO(resharding): fork handling. if epoch is finalized on different
// blocks, the second finalization will crash.
tries.freeze_mem_tries(
// Reshard the State column by setting ShardUId mapping from children to parent.
self.set_state_shard_uid_mapping(&split_shard_event)?;

// Create temporary children memtries by freezing parent memtrie and referencing it.
self.process_memtrie_resharding_storage_update(
chain_store_update,
block,
shard_uid,
vec![split_shard_event.left_child_shard, split_shard_event.right_child_shard],
tries,
split_shard_event.clone(),
)?;

// Trigger resharding of flat storage.
self.flat_storage_resharder.start_resharding(
ReshardingEventType::SplitShard(split_shard_event.clone()),
ReshardingEventType::SplitShard(split_shard_event),
&next_shard_layout,
)?;
Comment on lines +121 to 137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how this is shaping up - split_shard consists of splitting the State, FlatState and MemTrie.


let chunk_extra = self.get_chunk_extra(block_hash, &shard_uid)?;
Ok(())
}

/// Store in the database the mapping of ShardUId from children to the parent shard,
/// so that subsequent accesses to the State will use the parent shard's UId as a prefix for the database key.
fn set_state_shard_uid_mapping(
&mut self,
split_shard_event: &ReshardingSplitShardParams,
) -> io::Result<()> {
let mut store_update = self.store.trie_store().store_update();
staffik marked this conversation as resolved.
Show resolved Hide resolved
let parent_shard_uid = split_shard_event.parent_shard;
// TODO(reshardingV3) No need to set the mapping for children shards that we won't track just after resharding?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call making that a todo. Let's come back to this question when we figure out the full picture for post resharding cleanup.

for child_shard_uid in split_shard_event.children_shards() {
store_update.set_shard_uid_mapping(child_shard_uid, parent_shard_uid);
}
store_update.commit()
}

/// Creates temporary memtries for new shards to be able to process them in the next epoch.
/// Note this doesn't complete memtries resharding, proper memtries are to be created later.
fn process_memtrie_resharding_storage_update(
&mut self,
mut chain_store_update: ChainStoreUpdate,
block: &Block,
parent_shard_uid: ShardUId,
tries: ShardTries,
split_shard_event: ReshardingSplitShardParams,
) -> Result<(), Error> {
let block_hash = block.hash();
let block_height = block.header().height();
let _span = tracing::debug_span!(
target: "resharding", "process_memtrie_resharding_storage_update",
?block_hash, block_height, ?parent_shard_uid)
.entered();

// TODO(resharding): what if node doesn't have memtrie? just pause
// processing?
// TODO(resharding): fork handling. if epoch is finalized on different
// blocks, the second finalization will crash.
tries.freeze_mem_tries(parent_shard_uid, split_shard_event.children_shards())?;

let chunk_extra = self.get_chunk_extra(block_hash, &parent_shard_uid)?;
let boundary_account = split_shard_event.boundary_account;

let mut trie_store_update = self.store.store_update();
Expand All @@ -126,7 +192,7 @@ impl ReshardingManager {
"Memtrie not loaded. Cannot process memtrie resharding storage
update for block {:?}, shard {:?}",
block_hash,
shard_uid
parent_shard_uid,
);
return Err(Error::Other("Memtrie not loaded".to_string()));
};
Expand Down
3 changes: 1 addition & 2 deletions core/store/src/adapter/trie_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ impl<'a> TrieStoreUpdateAdapter<'a> {

/// Set the mapping from `child_shard_uid` to `parent_shard_uid`.
/// Used by Resharding V3 for State mapping.
#[cfg(test)]
fn set_shard_uid_mapping(&mut self, child_shard_uid: ShardUId, parent_shard_uid: ShardUId) {
pub fn set_shard_uid_mapping(&mut self, child_shard_uid: ShardUId, parent_shard_uid: ShardUId) {
self.store_update.set(
DBCol::StateShardUIdMapping,
child_shard_uid.to_bytes().as_ref(),
Expand Down
49 changes: 44 additions & 5 deletions integration-tests/src/test_loop/tests/resharding_v3.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use borsh::BorshDeserialize;
use itertools::Itertools;
use near_async::test_loop::data::TestLoopData;
use near_async::time::Duration;
use near_chain::ChainStoreAccess;
use near_chain_configs::test_genesis::TestGenesisBuilder;
use near_client::Client;
use near_o11y::testonly::init_test_logger;
use near_primitives::epoch_manager::EpochConfigStore;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{account_id_to_shard_uid, ShardLayout};
use near_primitives::state_record::StateRecord;
use near_primitives::types::{AccountId, ShardId};
use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION};
use near_store::ShardUId;
use near_store::adapter::StoreAdapter;
use near_store::db::refcount::decode_value_with_rc;
use near_store::{DBCol, ShardUId};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

Expand Down Expand Up @@ -59,6 +64,37 @@ fn print_and_assert_shard_accounts(client: &Client) {
}
}

/// Asserts that all parent shard State is accessible via parent and children shards.
fn check_state_shard_uid_mapping_after_resharding(client: &Client, parent_shard_uid: ShardUId) {
let tip = client.chain.head().unwrap();
let epoch_id = tip.epoch_id;
let epoch_config = client.epoch_manager.get_epoch_config(&epoch_id).unwrap();
let children_shard_uids =
epoch_config.shard_layout.get_children_shards_uids(parent_shard_uid.shard_id()).unwrap();
assert_eq!(children_shard_uids.len(), 2);

let store = client.chain.chain_store.store().trie_store();
for kv in store.store().iter_raw_bytes(DBCol::State) {
let (key, value) = kv.unwrap();
let shard_uid = ShardUId::try_from_slice(&key[0..8]).unwrap();
// Just after resharding, no State data must be keyed using children ShardUIds.
assert!(!children_shard_uids.contains(&shard_uid));
if shard_uid != parent_shard_uid {
continue;
}
let node_hash = CryptoHash::try_from_slice(&key[8..]).unwrap();
let (value, _) = decode_value_with_rc(&value);
let parent_value = store.get(parent_shard_uid, &node_hash);
// Parent shard data must still be accessible using parent ShardUId.
assert_eq!(&parent_value.unwrap()[..], value.unwrap());
// All parent shard data is available via both children shards.
for child_shard_uid in &children_shard_uids {
let child_value = store.get(*child_shard_uid, &node_hash);
assert_eq!(&child_value.unwrap()[..], value.unwrap());
}
}
}

/// Base setup to check sanity of Resharding V3.
/// TODO(#11881): add the following scenarios:
/// - Nodes must not track all shards. State sync must succeed.
Expand Down Expand Up @@ -100,7 +136,8 @@ fn test_resharding_v3_base(chunk_ranges_to_drop: HashMap<ShardUId, std::ops::Ran
base_epoch_config.shard_layout = ShardLayout::v1(vec!["account3".parse().unwrap()], None, 3);
let base_shard_layout = base_epoch_config.shard_layout.clone();
let mut epoch_config = base_epoch_config.clone();
epoch_config.shard_layout = create_new_shard_layout(&base_shard_layout, "account6");
let boundary_account = "account6";
epoch_config.shard_layout = create_new_shard_layout(&base_shard_layout, boundary_account);
let expected_num_shards = epoch_config.shard_layout.shard_ids().count();
let epoch_config_store = EpochConfigStore::test(BTreeMap::from_iter(vec![
(base_protocol_version, Arc::new(base_epoch_config)),
Expand All @@ -110,7 +147,7 @@ fn test_resharding_v3_base(chunk_ranges_to_drop: HashMap<ShardUId, std::ops::Ran
let mut genesis_builder = TestGenesisBuilder::new();
genesis_builder
.genesis_time_from_clock(&builder.clock())
.shard_layout(base_shard_layout)
.shard_layout(base_shard_layout.clone())
.protocol_version(base_protocol_version)
.epoch_length(epoch_length)
.validators_desired_roles(&block_and_chunk_producers, &[]);
Expand All @@ -127,6 +164,8 @@ fn test_resharding_v3_base(chunk_ranges_to_drop: HashMap<ShardUId, std::ops::Ran
.track_all_shards()
.build();

let boundary_account_id: AccountId = boundary_account.parse().unwrap();
let parent_shard_uid = account_id_to_shard_uid(&boundary_account_id, &base_shard_layout);
let client_handle = node_datas[0].client_sender.actor_handle();
let latest_block_height = std::cell::Cell::new(0u64);
let success_condition = |test_loop_data: &mut TestLoopData| -> bool {
Expand Down Expand Up @@ -160,7 +199,7 @@ fn test_resharding_v3_base(chunk_ranges_to_drop: HashMap<ShardUId, std::ops::Ran

println!("State after resharding:");
print_and_assert_shard_accounts(client);

check_state_shard_uid_mapping_after_resharding(&client, parent_shard_uid);
return true;
};

Expand Down
Loading