diff --git a/Cargo.lock b/Cargo.lock index d1886e9972f..ad0b100fb66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7444,6 +7444,7 @@ version = "0.0.0" dependencies = [ "borsh 1.2.0", "clap", + "near-async", "near-chain", "near-chain-configs", "near-chain-primitives", diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 4616fbee98a..d2f816294e2 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -10,6 +10,7 @@ use crate::missing_chunks::MissingChunksPool; use crate::orphan::{Orphan, OrphanBlockPool}; use crate::rayon_spawner::RayonAsyncComputationSpawner; use crate::resharding::manager::ReshardingManager; +use crate::resharding::types::ReshardingSender; use crate::sharding::shuffle_receipt_proofs; use crate::state_request_tracker::StateRequestTracker; use crate::state_snapshot_actor::SnapshotCallbacks; @@ -39,6 +40,7 @@ use crossbeam_channel::{unbounded, Receiver, Sender}; use itertools::Itertools; use lru::LruCache; use near_async::futures::{AsyncComputationSpawner, AsyncComputationSpawnerExt}; +use near_async::messaging::{noop, IntoMultiSender}; use near_async::time::{Clock, Duration, Instant}; use near_chain_configs::{MutableConfigValue, MutableValidatorSigner}; use near_chain_primitives::error::{BlockKnownError, Error, LogTransientStorageError}; @@ -361,7 +363,9 @@ impl Chain { let resharding_manager = ReshardingManager::new( store.clone(), epoch_manager.clone(), + runtime_adapter.clone(), MutableConfigValue::new(Default::default(), "resharding_config"), + noop().into_multi_sender(), ); Ok(Chain { clock: clock.clone(), @@ -401,6 +405,7 @@ impl Chain { snapshot_callbacks: Option, apply_chunks_spawner: Arc, validator: MutableValidatorSigner, + resharding_sender: ReshardingSender, ) -> Result { let state_roots = get_genesis_state_roots(runtime_adapter.store())? .expect("genesis should be initialized."); @@ -537,7 +542,9 @@ impl Chain { let resharding_manager = ReshardingManager::new( chain_store.store().clone(), epoch_manager.clone(), + runtime_adapter.clone(), chain_config.resharding_config, + resharding_sender, ); Ok(Chain { clock: clock.clone(), diff --git a/chain/chain/src/flat_storage_creator.rs b/chain/chain/src/flat_storage_creator.rs index 1f190f44cad..9fa30f41692 100644 --- a/chain/chain/src/flat_storage_creator.rs +++ b/chain/chain/src/flat_storage_creator.rs @@ -459,7 +459,7 @@ impl FlatStorageCreator { epoch_manager: &Arc, flat_storage_manager: &FlatStorageManager, runtime: &Arc, - _flat_storage_resharder: &FlatStorageResharder, + flat_storage_resharder: &FlatStorageResharder, ) -> Result, Error> { let epoch_id = &chain_head.epoch_id; tracing::debug!(target: "store", ?epoch_id, "creating flat storage for the current epoch"); @@ -486,9 +486,8 @@ impl FlatStorageCreator { ); } FlatStorageStatus::Disabled => {} - FlatStorageStatus::Resharding(_status) => { - // TODO(Trisfald): call resume - // flat_storage_resharder.resume(shard_uid, &status, ...)?; + FlatStorageStatus::Resharding(status) => { + flat_storage_resharder.resume(shard_uid, &status)?; } } } diff --git a/chain/chain/src/flat_storage_resharder.rs b/chain/chain/src/flat_storage_resharder.rs index f0073bee6cc..4f017e1f3c3 100644 --- a/chain/chain/src/flat_storage_resharder.rs +++ b/chain/chain/src/flat_storage_resharder.rs @@ -4,14 +4,15 @@ use std::sync::{Arc, Mutex}; -use crossbeam_channel::{Receiver, Sender}; use near_chain_configs::ReshardingHandle; use near_chain_primitives::Error; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; use crate::resharding::event_type::{ReshardingEventType, ReshardingSplitShardParams}; +use crate::resharding::types::FlatStorageSplitShardRequest; use crate::types::RuntimeAdapter; +use near_async::messaging::Sender; use near_primitives::shard_layout::{account_id_to_shard_id, ShardLayout}; use near_primitives::state::FlatStateValue; use near_primitives::trie_key::col::{self, ALL_COLUMNS_WITH_NAMES}; @@ -28,6 +29,7 @@ use near_store::flat::{ SplittingParentStatus, }; use near_store::{ShardUId, StorageError}; +use std::fmt::{Debug, Formatter}; /// `FlatStorageResharder` takes care of updating flat storage when a resharding event happens. /// @@ -42,8 +44,7 @@ use near_store::{ShardUId, StorageError}; /// Ready state. The parent shard storage is not needed anymore and can be removed. /// /// The resharder has also the following properties: -/// - Background processing: the bulk of resharding is done in a separate task, see -/// [FlatStorageResharderScheduler] +/// - Background processing: the bulk of resharding is done in a separate task. /// - Interruptible: a reshard operation can be cancelled through a /// [FlatStorageResharderController]. /// - In the case of event `Split` the state of flat storage will go back to what it was @@ -52,13 +53,24 @@ use near_store::{ShardUId, StorageError}; pub struct FlatStorageResharder { runtime: Arc, resharding_event: Arc>>, + scheduler: Sender, + pub controller: FlatStorageResharderController, } impl FlatStorageResharder { /// Creates a new `FlatStorageResharder`. - pub fn new(runtime: Arc) -> Self { + /// + /// # Args: + /// * `runtime`: runtime adapter + /// * `scheduler`: component used to schedule the background tasks + /// * `controller`: manages the execution of the background tasks + pub fn new( + runtime: Arc, + scheduler: Sender, + controller: FlatStorageResharderController, + ) -> Self { let resharding_event = Arc::new(Mutex::new(None)); - Self { runtime, resharding_event } + Self { runtime, resharding_event, scheduler, controller } } /// Starts a resharding event. @@ -68,19 +80,13 @@ impl FlatStorageResharder { /// # Args: /// * `event_type`: the type of resharding event /// * `shard_layout`: the new shard layout - /// * `scheduler`: component used to schedule the background tasks - /// * `controller`: manages the execution of the background tasks pub fn start_resharding( &self, event_type: ReshardingEventType, shard_layout: &ShardLayout, - scheduler: &dyn FlatStorageResharderScheduler, - controller: FlatStorageResharderController, ) -> Result<(), Error> { match event_type { - ReshardingEventType::SplitShard(params) => { - self.split_shard(params, shard_layout, scheduler, controller) - } + ReshardingEventType::SplitShard(params) => self.split_shard(params, shard_layout), } } @@ -91,14 +97,10 @@ impl FlatStorageResharder { /// # Args: /// * `shard_uid`: UId of the shard /// * `status`: resharding status of the shard - /// * `scheduler`: component used to schedule the background tasks - /// * `controller`: manages the execution of the background tasks pub fn resume( &self, shard_uid: ShardUId, status: &FlatStorageReshardingStatus, - scheduler: &dyn FlatStorageResharderScheduler, - controller: FlatStorageResharderController, ) -> Result<(), Error> { match status { FlatStorageReshardingStatus::CreatingChild => { @@ -112,7 +114,7 @@ impl FlatStorageResharder { // However, we don't know the current state of children shards, // so it's better to clean them. self.clean_children_shards(&status)?; - self.schedule_split_shard(parent_shard_uid, &status, scheduler, controller); + self.schedule_split_shard(parent_shard_uid, &status); } FlatStorageReshardingStatus::CatchingUp(_) => { info!(target: "resharding", ?shard_uid, ?status, "resuming flat storage shard catchup"); @@ -128,8 +130,6 @@ impl FlatStorageResharder { &self, split_params: ReshardingSplitShardParams, shard_layout: &ShardLayout, - scheduler: &dyn FlatStorageResharderScheduler, - controller: FlatStorageResharderController, ) -> Result<(), Error> { let ReshardingSplitShardParams { parent_shard, @@ -170,7 +170,7 @@ impl FlatStorageResharder { ); store_update.commit()?; - self.schedule_split_shard(parent_shard, &status, scheduler, controller); + self.schedule_split_shard(parent_shard, &status); Ok(()) } @@ -195,20 +195,12 @@ impl FlatStorageResharder { } /// Schedules a task to split a shard. - fn schedule_split_shard( - &self, - parent_shard: ShardUId, - status: &SplittingParentStatus, - scheduler: &dyn FlatStorageResharderScheduler, - controller: FlatStorageResharderController, - ) { + fn schedule_split_shard(&self, parent_shard: ShardUId, status: &SplittingParentStatus) { let event = FlatStorageReshardingEventStatus::SplitShard(parent_shard, status.clone()); self.set_resharding_event(event); info!(target: "resharding", ?parent_shard, ?status,"scheduling flat storage shard split"); - let resharder = self.clone(); - let task = Box::new(move || split_shard_task(resharder, controller)); - scheduler.schedule(task); + self.scheduler.send(FlatStorageSplitShardRequest { resharder }); } /// Cleans up children shards flat storage's content (status is excluded). @@ -235,106 +227,164 @@ impl FlatStorageResharder { None => None, } } -} -/// Retrieves the flat head of the given `shard`. -/// The shard must be in [FlatStorageStatus::Ready] state otherwise this method returns an error. -fn retrieve_shard_flat_head(shard: ShardUId, store: &FlatStoreAdapter) -> Result { - let status = - store.get_flat_storage_status(shard).map_err(|err| Into::::into(err))?; - if let FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }) = status { - Ok(flat_head) - } else { - let err_msg = "flat storage shard status is not ready!"; - error!(target: "resharding", ?shard, ?status, err_msg); - Err(Error::ReshardingError(err_msg.to_owned())) + /// Task to perform the actual split of a flat storage shard. This may be a long operation time-wise. + /// + /// Conceptually it simply copies each key-value pair from the parent shard to the correct child. + pub fn split_shard_task(&self) { + let task_status = self.split_shard_task_impl(); + self.split_shard_task_postprocessing(task_status); + info!(target: "resharding", ?task_status, "flat storage shard split task finished"); } -} -/// Task to perform the actual split of a flat storage shard. This may be a long operation time-wise. -/// -/// Conceptually it simply copies each key-value pair from the parent shard to the correct child. -fn split_shard_task(resharder: FlatStorageResharder, controller: FlatStorageResharderController) { - let task_status = split_shard_task_impl(resharder.clone(), controller.clone()); - split_shard_task_postprocessing(resharder, task_status); - info!(target: "resharding", ?task_status, "flat storage shard split task finished"); - if let Err(err) = controller.completion_sender.send(task_status) { - warn!(target: "resharding", ?err, "error notifying completion of flat storage shard split task") - }; -} + /// Performs the bulk of [split_shard_task]. + /// + /// Returns `true` if the routine completed successfully. + fn split_shard_task_impl(&self) -> FlatStorageReshardingTaskStatus { + if self.controller.is_cancelled() { + return FlatStorageReshardingTaskStatus::Cancelled; + } -/// Performs the bulk of [split_shard_task]. -/// -/// Returns `true` if the routine completed successfully. -fn split_shard_task_impl( - resharder: FlatStorageResharder, - controller: FlatStorageResharderController, -) -> FlatStorageReshardingTaskStatus { - if controller.is_interrupted() { - return FlatStorageReshardingTaskStatus::Cancelled; - } + /// Determines after how many key-values the process stops to + /// commit changes and to check cancellation. + const BATCH_SIZE: usize = 10_000; + + let (parent_shard, status) = self + .get_parent_shard_and_status() + .expect("flat storage resharding event must be Split!"); + info!(target: "resharding", ?parent_shard, ?status, "flat storage shard split task: starting key-values copy"); + + // Parent shard flat storage head must be on block height just before the new shard layout kicks + // in. This guarantees that all deltas have been applied and thus the state of all key-values is + // up to date. + // TODO(trisfald): do this check, maybe call update_flat_storage_for_shard + let _parent_flat_head = status.flat_head; + + // Prepare the store object for commits and the iterator over parent's flat storage. + let flat_store = self.runtime.store().flat_store(); + let mut iter = flat_store.iter(parent_shard); + + loop { + let mut store_update = flat_store.store_update(); + + // Process a `BATCH_SIZE` worth of key value pairs. + let mut iter_exhausted = false; + for _ in 0..BATCH_SIZE { + match iter.next() { + Some(Ok((key, value))) => { + if let Err(err) = + shard_split_handle_key_value(key, value, &mut store_update, &status) + { + error!(target: "resharding", ?err, "failed to handle flat storage key"); + return FlatStorageReshardingTaskStatus::Failed; + } + } + Some(Err(err)) => { + error!(target: "resharding", ?err, "failed to read flat storage value from parent shard"); + return FlatStorageReshardingTaskStatus::Failed; + } + None => { + iter_exhausted = true; + } + } + } - /// Determines after how many key-values the process stops to - /// commit changes and to check interruptions. - const BATCH_SIZE: usize = 10_000; + // Make a pause to commit and check if the routine should stop. + if let Err(err) = store_update.commit() { + error!(target: "resharding", ?err, "failed to commit store update"); + return FlatStorageReshardingTaskStatus::Failed; + } - let (parent_shard, status) = resharder - .get_parent_shard_and_status() - .expect("flat storage resharding event must be Split!"); - info!(target: "resharding", ?parent_shard, ?status, "flat storage shard split task: starting key-values copy"); + // TODO(Trisfald): metrics and logs - // Parent shard flat storage head must be on block height just before the new shard layout kicks - // in. This guarantees that all deltas have been applied and thus the state of all key-values is - // up to date. - // TODO(trisfald): do this check, maybe call update_flat_storage_for_shard - let _parent_flat_head = status.flat_head; + // If `iter`` is exhausted we can exit after the store commit. + if iter_exhausted { + break; + } + if self.controller.is_cancelled() { + return FlatStorageReshardingTaskStatus::Cancelled; + } + } + FlatStorageReshardingTaskStatus::Successful + } - // Prepare the store object for commits and the iterator over parent's flat storage. - let flat_store = resharder.runtime.store().flat_store(); - let mut iter = flat_store.iter(parent_shard); + /// Performs post-processing of shard splitting after all key-values have been moved from parent to + /// children. `success` indicates whether or not the previous phase was successful. + fn split_shard_task_postprocessing(&self, task_status: FlatStorageReshardingTaskStatus) { + let (parent_shard, split_status) = self + .get_parent_shard_and_status() + .expect("flat storage resharding event must be Split!"); + let SplittingParentStatus { left_child_shard, right_child_shard, flat_head, .. } = + split_status; + let flat_store = self.runtime.store().flat_store(); + info!(target: "resharding", ?parent_shard, ?task_status, ?split_status, "flat storage shard split task: post-processing"); - loop { let mut store_update = flat_store.store_update(); - - // Process a `BATCH_SIZE` worth of key value pairs. - let mut iter_exhausted = false; - for _ in 0..BATCH_SIZE { - match iter.next() { - Some(Ok((key, value))) => { - if let Err(err) = - shard_split_handle_key_value(key, value, &mut store_update, &status) - { - error!(target: "resharding", ?err, "failed to handle flat storage key"); - return FlatStorageReshardingTaskStatus::Failed; - } + match task_status { + FlatStorageReshardingTaskStatus::Successful => { + // Split shard completed successfully. + // Parent flat storage can be deleted from the FlatStoreManager. + // If FlatStoreManager has no reference to the shard, delete it manually. + if !self + .runtime + .get_flat_storage_manager() + .remove_flat_storage_for_shard(parent_shard, &mut store_update) + .unwrap() + { + store_update.remove_flat_storage(parent_shard); } - Some(Err(err)) => { - error!(target: "resharding", ?err, "failed to read flat storage value from parent shard"); - return FlatStorageReshardingTaskStatus::Failed; + // Children must perform catchup. + for child_shard in [left_child_shard, right_child_shard] { + store_update.set_flat_storage_status( + child_shard, + FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp( + flat_head.hash, + )), + ); } - None => { - iter_exhausted = true; + // TODO(trisfald): trigger catchup + } + FlatStorageReshardingTaskStatus::Failed + | FlatStorageReshardingTaskStatus::Cancelled => { + // We got an error or a cancellation request. + // Reset parent. + store_update.set_flat_storage_status( + parent_shard, + FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }), + ); + // Remove children shards leftovers. + for child_shard in [left_child_shard, right_child_shard] { + store_update.remove_flat_storage(child_shard); } } } + store_update.commit().unwrap(); + // Terminate the resharding event. + *self.resharding_event.lock().unwrap() = None; + } +} - // Make a pause to commit and check if the routine should stop. - if let Err(err) = store_update.commit() { - error!(target: "resharding", ?err, "failed to commit store update"); - return FlatStorageReshardingTaskStatus::Failed; - } - - // TODO(Trisfald): metrics and logs +impl Debug for FlatStorageResharder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FlatStorageResharder") + .field("event", &self.resharding_event()) + .field("controller", &self.controller) + .finish() + } +} - // If `iter`` is exhausted we can exit after the store commit. - if iter_exhausted { - break; - } - if controller.is_interrupted() { - return FlatStorageReshardingTaskStatus::Cancelled; - } +/// Retrieves the flat head of the given `shard`. +/// The shard must be in [FlatStorageStatus::Ready] state otherwise this method returns an error. +fn retrieve_shard_flat_head(shard: ShardUId, store: &FlatStoreAdapter) -> Result { + let status = + store.get_flat_storage_status(shard).map_err(|err| Into::::into(err))?; + if let FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }) = status { + Ok(flat_head) + } else { + let err_msg = "flat storage shard status is not ready!"; + error!(target: "resharding", ?shard, ?status, err_msg); + Err(Error::ReshardingError(err_msg.to_owned())) } - FlatStorageReshardingTaskStatus::Successful } /// Handles the inheritance of a key-value pair from parent shard to children shards. @@ -402,59 +452,6 @@ fn shard_split_handle_key_value( Ok(()) } -/// Performs post-processing of shard splitting after all key-values have been moved from parent to -/// children. `success` indicates whether or not the previous phase was successful. -fn split_shard_task_postprocessing( - resharder: FlatStorageResharder, - task_status: FlatStorageReshardingTaskStatus, -) { - let (parent_shard, split_status) = resharder - .get_parent_shard_and_status() - .expect("flat storage resharding event must be Split!"); - let SplittingParentStatus { left_child_shard, right_child_shard, flat_head, .. } = split_status; - let flat_store = resharder.runtime.store().flat_store(); - info!(target: "resharding", ?parent_shard, ?task_status, ?split_status, "flat storage shard split task: post-processing"); - - let mut store_update = flat_store.store_update(); - match task_status { - FlatStorageReshardingTaskStatus::Successful => { - // Split shard completed successfully. - // Parent flat storage can be deleted from the FlatStoreManager. - resharder - .runtime - .get_flat_storage_manager() - .remove_flat_storage_for_shard(parent_shard, &mut store_update) - .unwrap(); - store_update.remove_flat_storage(parent_shard); - // Children must perform catchup. - for child_shard in [left_child_shard, right_child_shard] { - store_update.set_flat_storage_status( - child_shard, - FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp( - flat_head.hash, - )), - ); - } - // TODO(trisfald): trigger catchup - } - FlatStorageReshardingTaskStatus::Failed | FlatStorageReshardingTaskStatus::Cancelled => { - // We got an error or an interrupt request. - // Reset parent. - store_update.set_flat_storage_status( - parent_shard, - FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head }), - ); - // Remove children shards leftovers. - for child_shard in [left_child_shard, right_child_shard] { - store_update.remove_flat_storage(child_shard); - } - } - } - store_update.commit().unwrap(); - // Terminate the resharding event. - *resharder.resharding_event.lock().unwrap() = None; -} - /// Copies a key-value pair to the correct child shard by matching the account-id to the provided shard layout. fn copy_kv_to_child( status: &SplittingParentStatus, @@ -517,49 +514,37 @@ pub enum FlatStorageReshardingTaskStatus { Cancelled, } -/// Helps control the flat storage resharder operation. More specifically, -/// it has a way to know when the background task is done or to interrupt it. -#[derive(Clone)] +/// Helps control the flat storage resharder background operations. This struct wraps +/// [ReshardingHandle] and gives better meaning request to stop any processing when applied to flat +/// storage. In flat storage resharding there's a slight difference between interrupt and cancel. +/// Interruption happens when the node crashes whilst cancellation is an on demand request. An +/// interrupted flat storage resharding will resume on node restart, a cancelled one won't. +#[derive(Clone, Debug)] pub struct FlatStorageResharderController { - /// Resharding handle to control interruption. + /// Resharding handle to control cancellation. handle: ReshardingHandle, - /// This object will be used to signal when the background task is completed. - completion_sender: Sender, - /// Corresponding receiver for `completion_sender`. - pub completion_receiver: Receiver, } impl FlatStorageResharderController { /// Creates a new `FlatStorageResharderController` with its own handle. pub fn new() -> Self { - let (completion_sender, completion_receiver) = crossbeam_channel::bounded(1); let handle = ReshardingHandle::new(); - Self { handle, completion_sender, completion_receiver } + Self { handle } } pub fn from_resharding_handle(handle: ReshardingHandle) -> Self { - let (completion_sender, completion_receiver) = crossbeam_channel::bounded(1); - Self { handle, completion_sender, completion_receiver } - } - - pub fn handle(&self) -> &ReshardingHandle { - &self.handle + Self { handle } } - /// Returns whether or not background task is interrupted. - pub fn is_interrupted(&self) -> bool { + /// Returns whether or not background task is cancelled. + pub fn is_cancelled(&self) -> bool { !self.handle.get() } } -/// Represent the capability of scheduling the background tasks spawned by flat storage resharding. -pub trait FlatStorageResharderScheduler { - fn schedule(&self, f: Box); -} - #[cfg(test)] mod tests { - use std::{cell::RefCell, collections::BTreeMap, time::Duration}; + use std::collections::BTreeMap; use near_async::time::Clock; use near_chain_configs::{Genesis, MutableConfigValue}; @@ -579,11 +564,12 @@ mod tests { }; use crate::{ - rayon_spawner::RayonAsyncComputationSpawner, runtime::NightshadeRuntime, - types::ChainConfig, Chain, ChainGenesis, DoomslugThresholdMode, + rayon_spawner::RayonAsyncComputationSpawner, resharding::types::ReshardingSender, + runtime::NightshadeRuntime, types::ChainConfig, Chain, ChainGenesis, DoomslugThresholdMode, }; use super::*; + use near_async::messaging::{CanSend, IntoMultiSender}; use near_crypto::{KeyType, PublicKey}; /// Shorthand to create account ID. @@ -593,28 +579,31 @@ mod tests { }; } + #[derive(Default)] struct TestScheduler {} - impl FlatStorageResharderScheduler for TestScheduler { - fn schedule(&self, f: Box) { - f(); + impl CanSend for TestScheduler { + fn send(&self, msg: FlatStorageSplitShardRequest) { + msg.resharder.split_shard_task(); } } #[derive(Default)] struct DelayedScheduler { - callable: RefCell>>, + test_scheduler: TestScheduler, + split_shard_request: Mutex>, } impl DelayedScheduler { - fn call(&self) { - self.callable.take().unwrap()(); + fn call_split_shard_task(&self) { + let msg_guard = self.split_shard_request.lock().unwrap(); + self.test_scheduler.send(msg_guard.clone().unwrap()); } } - impl FlatStorageResharderScheduler for DelayedScheduler { - fn schedule(&self, f: Box) { - *self.callable.borrow_mut() = Some(f); + impl CanSend for DelayedScheduler { + fn send(&self, msg: FlatStorageSplitShardRequest) { + *self.split_shard_request.lock().unwrap() = Some(msg); } } @@ -641,8 +630,11 @@ mod tests { ) } - /// Generic test setup. - fn create_fs_resharder(shard_layout: ShardLayout) -> (Chain, FlatStorageResharder) { + /// Generic test setup. It creates an instance of chain and a FlatStorageResharder. + fn create_chain_and_resharder( + shard_layout: ShardLayout, + resharding_sender: ReshardingSender, + ) -> (Chain, FlatStorageResharder) { let num_shards = shard_layout.shard_ids().count(); let genesis = Genesis::test_with_seeds( Clock::real(), @@ -663,16 +655,18 @@ mod tests { Clock::real(), epoch_manager, shard_tracker, - runtime.clone(), + runtime, &chain_genesis, DoomslugThresholdMode::NoApprovals, ChainConfig::test(), None, Arc::new(RayonAsyncComputationSpawner), MutableConfigValue::new(None, "validator_signer"), + resharding_sender, ) .unwrap(); - (chain, FlatStorageResharder::new(runtime)) + let resharder = chain.resharding_manager.flat_storage_resharder.clone(); + (chain, resharder) } /// Utility function to derive the resharding event type from chain and shard layout. @@ -693,44 +687,34 @@ mod tests { #[test] fn concurrent_reshardings_are_disallowed() { init_test_logger(); - let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let sender = DelayedScheduler::default().into_multi_sender(); + let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender); let new_shard_layout = shard_layout_after_split(); - let scheduler = DelayedScheduler::default(); let controller = FlatStorageResharderController::new(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); assert!(resharder - .start_resharding( - resharding_event_type.clone(), - &new_shard_layout, - &scheduler, - controller.clone() - ) + .start_resharding(resharding_event_type.clone(), &new_shard_layout) .is_ok()); - // Immediately interrupt the resharding. - controller.handle().stop(); + // Immediately cancel the resharding. + controller.handle.stop(); assert!(resharder.resharding_event().is_some()); - assert!(resharder - .start_resharding(resharding_event_type, &new_shard_layout, &scheduler, controller) - .is_err()); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_err()); } /// Flat storage shard status should be set correctly upon starting a shard split. #[test] fn flat_storage_split_status_set() { init_test_logger(); - let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let sender = DelayedScheduler::default().into_multi_sender(); + let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender); let new_shard_layout = shard_layout_after_split(); - let scheduler = DelayedScheduler::default(); - let controller = FlatStorageResharderController::new(); let flat_store = resharder.runtime.store().flat_store(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); - assert!(resharder - .start_resharding(resharding_event_type, &new_shard_layout, &scheduler, controller) - .is_ok()); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); let resharding_event = resharder.resharding_event(); match resharding_event.unwrap() { @@ -758,7 +742,8 @@ mod tests { #[test] fn resume_split_starts_from_clean_state() { init_test_logger(); - let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let sender = TestScheduler::default().into_multi_sender(); + let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender); let flat_store = resharder.runtime.store().flat_store(); let new_shard_layout = shard_layout_after_split(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); @@ -800,9 +785,7 @@ mod tests { store_update.commit().unwrap(); // Resume resharding. - let scheduler = TestScheduler {}; - let controller = FlatStorageResharderController::new(); - resharder.resume(parent_shard, &resharding_status, &scheduler, controller).unwrap(); + resharder.resume(parent_shard, &resharding_status).unwrap(); // Children should not contain the random keys written before. for child_shard in [left_child_shard, right_child_shard] { @@ -826,20 +809,12 @@ mod tests { fn simple_split_shard() { init_test_logger(); // Perform resharding. - let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let sender = TestScheduler::default().into_multi_sender(); + let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender); let new_shard_layout = shard_layout_after_split(); - let scheduler = TestScheduler {}; - let controller = FlatStorageResharderController::new(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); - assert!(resharder - .start_resharding( - resharding_event_type, - &new_shard_layout, - &scheduler, - controller.clone() - ) - .is_ok()); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok()); // Check flat storages of children contain the correct accounts and access keys. let left_child = ShardUId { version: 3, shard_id: 2 }; @@ -868,12 +843,6 @@ mod tests { .get(right_child, &account_vv_access_key.to_vec()) .is_ok_and(|val| val.is_some())); - // Controller should signal that resharding ended. - assert_eq!( - controller.completion_receiver.recv_timeout(Duration::from_secs(1)), - Ok(FlatStorageReshardingTaskStatus::Successful) - ); - // Check final status of parent flat storage. let parent = ShardUId { version: 3, shard_id: 1 }; assert_eq!(flat_store.get_flat_storage_status(parent), Ok(FlatStorageStatus::Empty)); @@ -897,38 +866,27 @@ mod tests { } #[test] - fn interrupt_split_shard() { + fn cancel_split_shard() { init_test_logger(); // Perform resharding. - let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let scheduler = Arc::new(DelayedScheduler::default()); + let sender = scheduler.as_multi_sender(); + let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender); let new_shard_layout = shard_layout_after_split(); - let scheduler = DelayedScheduler::default(); - let controller = FlatStorageResharderController::new(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); - assert!(resharder - .start_resharding( - resharding_event_type, - &new_shard_layout, - &scheduler, - controller.clone() - ) - .is_ok()); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok()); let (parent_shard, status) = resharder.get_parent_shard_and_status().unwrap(); let SplittingParentStatus { left_child_shard, right_child_shard, flat_head, .. } = status; - // Interrupt the task before it starts. - controller.handle().stop(); + // Cancel the task before it starts. + resharder.controller.handle.stop(); // Run the task. - scheduler.call(); + scheduler.call_split_shard_task(); // Check that resharding was effectively cancelled. let flat_store = resharder.runtime.store().flat_store(); - assert_eq!( - controller.completion_receiver.recv_timeout(Duration::from_secs(1)), - Ok(FlatStorageReshardingTaskStatus::Cancelled) - ); assert_eq!( flat_store.get_flat_storage_status(parent_shard), Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head })) @@ -945,10 +903,9 @@ mod tests { /// A shard can't be split if it isn't in ready state. #[test] fn reject_split_shard_if_parent_is_not_ready() { - let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let sender = TestScheduler::default().into_multi_sender(); + let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender); let new_shard_layout = shard_layout_after_split(); - let scheduler = TestScheduler {}; - let controller = FlatStorageResharderController::new(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); // Make flat storage of parent shard not ready. @@ -959,9 +916,7 @@ mod tests { store_update.commit().unwrap(); // Trigger resharding and it should fail. - assert!(resharder - .start_resharding(resharding_event_type, &new_shard_layout, &scheduler, controller) - .is_err()); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_err()); } /// Verify that a shard can be split correctly even if its flat head is lagging behind the expected @@ -984,7 +939,8 @@ mod tests { #[test] fn split_shard_handle_account_id_keys() { init_test_logger(); - let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let sender = TestScheduler::default().into_multi_sender(); + let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender); let new_shard_layout = shard_layout_after_split(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); let ReshardingSplitShardParams { @@ -1051,14 +1007,7 @@ mod tests { store_update.commit().unwrap(); // Do resharding. - assert!(resharder - .start_resharding( - resharding_event_type, - &new_shard_layout, - &TestScheduler {}, - FlatStorageResharderController::new() - ) - .is_ok()); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); // Check each child has the correct keys assigned to itself. for key in &account_mm_keys { @@ -1075,7 +1024,8 @@ mod tests { #[test] fn split_shard_handle_delayed_receipts() { init_test_logger(); - let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let sender = TestScheduler::default().into_multi_sender(); + let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender); let new_shard_layout = shard_layout_after_split(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); let ReshardingSplitShardParams { @@ -1103,14 +1053,7 @@ mod tests { store_update.commit().unwrap(); // Do resharding. - assert!(resharder - .start_resharding( - resharding_event_type, - &new_shard_layout, - &TestScheduler {}, - FlatStorageResharderController::new() - ) - .is_ok()); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok()); // Check that flat storages of both children contain the delayed receipt. for child_shard in [left_child_shard, right_child_shard] { @@ -1129,7 +1072,8 @@ mod tests { #[test] fn split_shard_handle_promise_yield() { init_test_logger(); - let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let sender = TestScheduler::default().into_multi_sender(); + let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender); let new_shard_layout = shard_layout_after_split(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); let ReshardingSplitShardParams { @@ -1173,14 +1117,7 @@ mod tests { store_update.commit().unwrap(); // Do resharding. - assert!(resharder - .start_resharding( - resharding_event_type, - &new_shard_layout, - &TestScheduler {}, - FlatStorageResharderController::new() - ) - .is_ok()); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok()); // Check that flat storages of both children contain the promise yield. for child_shard in [left_child_shard, right_child_shard] { @@ -1203,7 +1140,8 @@ mod tests { #[test] fn split_shard_handle_buffered_receipts() { init_test_logger(); - let (chain, resharder) = create_fs_resharder(simple_shard_layout()); + let sender = TestScheduler::default().into_multi_sender(); + let (chain, resharder) = create_chain_and_resharder(simple_shard_layout(), sender); let new_shard_layout = shard_layout_after_split(); let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); let ReshardingSplitShardParams { @@ -1236,14 +1174,7 @@ mod tests { store_update.commit().unwrap(); // Do resharding. - assert!(resharder - .start_resharding( - resharding_event_type, - &new_shard_layout, - &TestScheduler {}, - FlatStorageResharderController::new() - ) - .is_ok()); + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout,).is_ok()); // Check that only the first child contain the buffered receipt. assert_eq!( diff --git a/chain/chain/src/resharding/event_type.rs b/chain/chain/src/resharding/event_type.rs index b00b4b90ceb..34fbec3cdc4 100644 --- a/chain/chain/src/resharding/event_type.rs +++ b/chain/chain/src/resharding/event_type.rs @@ -26,34 +26,35 @@ pub struct ReshardingSplitShardParams { pub right_child_shard: ShardUId, /// The account at the boundary between the two children. pub boundary_account: AccountId, - /// Hash of the first block having the new shard layout. + /// Hash of the last block having the old shard layout. pub block_hash: CryptoHash, /// The block before `block_hash`. pub prev_block_hash: CryptoHash, } impl ReshardingEventType { - /// Takes as input a [ShardLayout] definition and deduces which kind of resharding operation must be - /// performed. + /// Takes as input a [ShardLayout] definition and deduces which kind of resharding operation + /// must be performed. /// /// # Args: - /// * `shard_layout`: the new shard layout - /// * `block_hash`: hash of the first block with `shard_layout` + /// * `next_shard_layout`: the new shard layout + /// * `block_hash`: hash of the last block with the shard layout before `next_shard_layout` /// * `prev_block_hash`: hash of the block preceding `block_hash` /// - /// Returns a [ReshardingEventType] if exactly one resharding change is contained in `shard_layout`, otherwise returns `None`. + /// Returns a [ReshardingEventType] if exactly one resharding change is contained in + /// `next_shard_layout`, otherwise returns `None`. pub fn from_shard_layout( - shard_layout: &ShardLayout, + next_shard_layout: &ShardLayout, block_hash: CryptoHash, prev_block_hash: CryptoHash, ) -> Result, Error> { let log_and_error = |err_msg: &str| { - error!(target: "resharding", ?shard_layout, err_msg); + error!(target: "resharding", ?next_shard_layout, err_msg); Err(Error::ReshardingError(err_msg.to_owned())) }; // Resharding V3 supports shard layout V2 onwards. - let (shards_split_map, boundary_accounts) = match shard_layout { + let (shards_split_map, boundary_accounts) = match next_shard_layout { ShardLayout::V0(_) | ShardLayout::V1(_) => { return log_and_error("unsupported shard layout!"); } @@ -77,16 +78,17 @@ impl ReshardingEventType { } // Parent shard is no longer part of this shard layout. let parent_shard = ShardUId { - version: shard_layout.version(), + version: next_shard_layout.version(), shard_id: shard_id_as_u32(*parent_id), }; let left_child_shard = - ShardUId::from_shard_id_and_layout(children_ids[0], shard_layout); + ShardUId::from_shard_id_and_layout(children_ids[0], next_shard_layout); let right_child_shard = - ShardUId::from_shard_id_and_layout(children_ids[1], shard_layout); + ShardUId::from_shard_id_and_layout(children_ids[1], next_shard_layout); // Find the boundary account between the two children. - let Some(boundary_account_index) = - shard_layout.shard_ids().position(|id| id == left_child_shard.shard_id()) + let Some(boundary_account_index) = next_shard_layout + .shard_ids() + .position(|id| id == left_child_shard.shard_id()) else { return log_and_error(&format!( "shard {left_child_shard} not found in shard layout" diff --git a/chain/chain/src/resharding/manager.rs b/chain/chain/src/resharding/manager.rs index 9a999145ed8..fd0a0bd9c4e 100644 --- a/chain/chain/src/resharding/manager.rs +++ b/chain/chain/src/resharding/manager.rs @@ -1,6 +1,10 @@ use std::sync::Arc; use super::event_type::ReshardingEventType; +use super::types::ReshardingSender; +use crate::flat_storage_resharder::{FlatStorageResharder, FlatStorageResharderController}; +use crate::types::RuntimeAdapter; +use near_async::messaging::IntoSender; use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle}; use near_chain_primitives::Error; use near_epoch_manager::EpochManagerAdapter; @@ -23,15 +27,25 @@ pub struct ReshardingManager { /// A handle that allows the main process to interrupt resharding if needed. /// This typically happens when the main process is interrupted. pub resharding_handle: ReshardingHandle, + /// Takes care of performing resharding on the flat storage. + pub flat_storage_resharder: FlatStorageResharder, } impl ReshardingManager { pub fn new( store: Store, epoch_manager: Arc, + runtime_adapter: Arc, resharding_config: MutableConfigValue, + resharding_sender: ReshardingSender, ) -> Self { - Self { store, epoch_manager, resharding_config, resharding_handle: ReshardingHandle::new() } + let resharding_handle = ReshardingHandle::new(); + let flat_storage_resharder = FlatStorageResharder::new( + runtime_adapter, + resharding_sender.into_sender(), + FlatStorageResharderController::from_resharding_handle(resharding_handle.clone()), + ); + Self { store, epoch_manager, resharding_config, flat_storage_resharder, resharding_handle } } /// If shard layout changes after the given block, creates temporary @@ -90,6 +104,12 @@ impl ReshardingManager { vec![split_shard_event.left_child_shard, split_shard_event.right_child_shard], )?; + // Trigger resharding of flat storage. + self.flat_storage_resharder.start_resharding( + ReshardingEventType::SplitShard(split_shard_event.clone()), + &next_shard_layout, + )?; + let chunk_extra = self.get_chunk_extra(block_hash, &shard_uid)?; let boundary_account = split_shard_event.boundary_account; diff --git a/chain/chain/src/resharding/mod.rs b/chain/chain/src/resharding/mod.rs index f3e8410acc6..989a945f137 100644 --- a/chain/chain/src/resharding/mod.rs +++ b/chain/chain/src/resharding/mod.rs @@ -1,5 +1,7 @@ pub mod event_type; pub mod manager; +pub mod resharding_actor; pub mod resharding_v2; +pub mod types; pub use resharding_v2 as v2; diff --git a/chain/chain/src/resharding/resharding_actor.rs b/chain/chain/src/resharding/resharding_actor.rs new file mode 100644 index 00000000000..8a8c0b34b36 --- /dev/null +++ b/chain/chain/src/resharding/resharding_actor.rs @@ -0,0 +1,23 @@ +use super::types::FlatStorageSplitShardRequest; +use near_async::messaging::{self, Handler}; + +/// Dedicated actor for resharding V3. +pub struct ReshardingActor {} + +impl messaging::Actor for ReshardingActor {} + +impl Handler for ReshardingActor { + fn handle(&mut self, msg: FlatStorageSplitShardRequest) { + self.handle_flat_storage_split_shard_request(msg); + } +} + +impl ReshardingActor { + pub fn new() -> Self { + Self {} + } + + pub fn handle_flat_storage_split_shard_request(&mut self, msg: FlatStorageSplitShardRequest) { + msg.resharder.split_shard_task(); + } +} diff --git a/chain/chain/src/resharding/types.rs b/chain/chain/src/resharding/types.rs new file mode 100644 index 00000000000..47ded965ee8 --- /dev/null +++ b/chain/chain/src/resharding/types.rs @@ -0,0 +1,19 @@ +use crate::flat_storage_resharder::FlatStorageResharder; +use near_async::messaging::Sender; + +/// Represents a request to start the split of a parent shard flat storage into two children flat +/// storages. +#[derive(actix::Message, Clone, Debug)] +#[rtype(result = "()")] +pub struct FlatStorageSplitShardRequest { + pub resharder: FlatStorageResharder, +} + +/// A multi-sender for the FlatStorageResharder post processing API. +/// +/// This is meant to be used to send messages to handle the post processing tasks needed for +/// resharding the flat storage. An example is splitting a shard. +#[derive(Clone, near_async::MultiSend, near_async::MultiSenderFrom)] +pub struct ReshardingSender { + pub flat_storage_split_shard_send: Sender, +} diff --git a/chain/chain/src/runtime/tests.rs b/chain/chain/src/runtime/tests.rs index 5286a78dc06..c18b7605748 100644 --- a/chain/chain/src/runtime/tests.rs +++ b/chain/chain/src/runtime/tests.rs @@ -43,6 +43,7 @@ use near_store::{get_genesis_state_roots, NodeStorage, PartialStorage}; use super::*; use crate::rayon_spawner::RayonAsyncComputationSpawner; +use near_async::messaging::{noop, IntoMultiSender}; use near_async::time::Clock; use near_primitives::trie_key::TrieKey; use primitive_types::U256; @@ -1648,6 +1649,7 @@ fn get_test_env_with_chain_and_pool() -> (TestEnv, Chain, TransactionPool) { None, Arc::new(RayonAsyncComputationSpawner), MutableConfigValue::new(None, "validator_signer"), + noop().into_multi_sender(), ) .unwrap(); diff --git a/chain/chain/src/store_validator.rs b/chain/chain/src/store_validator.rs index 43744ce105d..8bceb0b043d 100644 --- a/chain/chain/src/store_validator.rs +++ b/chain/chain/src/store_validator.rs @@ -393,6 +393,7 @@ mod tests { use crate::{Chain, ChainGenesis, ChainStoreAccess, DoomslugThresholdMode}; use super::*; + use near_async::messaging::{noop, IntoMultiSender}; fn init() -> (Chain, StoreValidator) { let store = create_test_store(); @@ -419,6 +420,7 @@ mod tests { None, Arc::new(RayonAsyncComputationSpawner), MutableConfigValue::new(None, "validator_signer"), + noop().into_multi_sender(), ) .unwrap(); ( diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index e9124bcfee9..566cfe9831a 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -32,6 +32,7 @@ use tracing::debug; pub use self::kv_runtime::{account_id_to_shard_id, KeyValueRuntime, MockEpochManager}; pub use self::validator_schedule::ValidatorSchedule; +use near_async::messaging::{noop, IntoMultiSender}; pub fn get_chain(clock: Clock) -> Chain { get_chain_with_epoch_length_and_num_shards(clock, 10, 1) @@ -76,6 +77,7 @@ pub fn get_chain_with_epoch_length_and_num_shards( None, Arc::new(RayonAsyncComputationSpawner), MutableConfigValue::new(None, "validator_signer"), + noop().into_multi_sender(), ) .unwrap() } @@ -160,6 +162,7 @@ pub fn setup_with_tx_validity_period( None, Arc::new(RayonAsyncComputationSpawner), MutableConfigValue::new(None, "validator_signer"), + noop().into_multi_sender(), ) .unwrap(); diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 39370ed160f..cfeeb06c73e 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -25,7 +25,6 @@ use near_chain::chain::{ VerifyBlockHashAndSignatureResult, }; use near_chain::flat_storage_creator::FlatStorageCreator; -use near_chain::flat_storage_resharder::FlatStorageResharder; use near_chain::orphan::OrphanMissingChunks; use near_chain::state_snapshot_actor::SnapshotCallbacks; use near_chain::test_utils::format_hash; @@ -91,6 +90,7 @@ use tracing::{debug, debug_span, error, info, instrument, trace, warn}; use crate::client_actor::AdvProduceChunksMode; use crate::sync::epoch::EpochSync; use crate::sync::state::chain_requests::ChainSenderForStateSync; +use near_chain::resharding::types::ReshardingSender; const NUM_REBROADCAST_BLOCKS: usize = 30; @@ -175,8 +175,8 @@ pub struct Client { /// Cached precomputed set of TIER1 accounts. /// See send_network_chain_info(). tier1_accounts_cache: Option<(EpochId, Arc)>, - /// Takes care of performing resharding on the flat storage. - pub flat_storage_resharder: FlatStorageResharder, + /// Resharding sender. + pub resharding_sender: ReshardingSender, /// Used when it is needed to create flat storage in background for some shards. flat_storage_creator: Option, /// A map storing the last time a block was requested for state sync. @@ -247,6 +247,7 @@ impl Client { snapshot_callbacks: Option, async_computation_spawner: Arc, partial_witness_adapter: PartialWitnessSenderForClient, + resharding_sender: ReshardingSender, state_sync_future_spawner: Arc, chain_sender_for_state_sync: ChainSenderForStateSync, ) -> Result { @@ -271,14 +272,14 @@ impl Client { snapshot_callbacks, async_computation_spawner.clone(), validator_signer.clone(), + resharding_sender.clone(), )?; - let flat_storage_resharder = FlatStorageResharder::new(runtime_adapter.clone()); // Create flat storage or initiate migration to flat storage. let flat_storage_creator = FlatStorageCreator::new( epoch_manager.clone(), runtime_adapter.clone(), chain.chain_store(), - &flat_storage_resharder, + &chain.resharding_manager.flat_storage_resharder.clone(), chain_config.background_migration_threads, )?; let sharded_tx_pool = @@ -409,7 +410,7 @@ impl Client { NonZeroUsize::new(PRODUCTION_TIMES_CACHE_SIZE).unwrap(), ), tier1_accounts_cache: None, - flat_storage_resharder, + resharding_sender, flat_storage_creator, last_time_sync_block_requested: HashMap::new(), chunk_validator, diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 66942b3102a..b82621f8d43 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -31,6 +31,7 @@ use near_chain::chain::{ ApplyChunksDoneMessage, BlockCatchUpRequest, BlockCatchUpResponse, ChunkStateWitnessMessage, }; use near_chain::rayon_spawner::RayonAsyncComputationSpawner; +use near_chain::resharding::types::ReshardingSender; use near_chain::state_snapshot_actor::SnapshotCallbacks; use near_chain::test_utils::format_hash; use near_chain::types::RuntimeAdapter; @@ -142,6 +143,7 @@ pub fn start_client( partial_witness_adapter: PartialWitnessSenderForClient, enable_doomslug: bool, seed: Option, + resharding_sender: ReshardingSender, ) -> StartClientResult { let client_arbiter = actix::Arbiter::new(); let client_arbiter_handle = client_arbiter.handle(); @@ -165,6 +167,7 @@ pub fn start_client( snapshot_callbacks, Arc::new(RayonAsyncComputationSpawner), partial_witness_adapter, + resharding_sender, state_sync_future_spawner, chain_sender_for_state_sync.as_multi_sender(), ) diff --git a/chain/client/src/info.rs b/chain/client/src/info.rs index 05194acebfe..8d5def490ba 100644 --- a/chain/client/src/info.rs +++ b/chain/client/src/info.rs @@ -955,7 +955,7 @@ fn get_validator_production_status( mod tests { use super::*; use assert_matches::assert_matches; - use near_async::messaging::{noop, IntoSender}; + use near_async::messaging::{noop, IntoMultiSender, IntoSender}; use near_async::time::Clock; use near_chain::rayon_spawner::RayonAsyncComputationSpawner; use near_chain::runtime::NightshadeRuntime; @@ -1017,6 +1017,7 @@ mod tests { None, Arc::new(RayonAsyncComputationSpawner), validator.clone(), + noop().into_multi_sender(), ) .unwrap(); diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index 152738d8ed0..be1eaa70bcb 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -22,6 +22,8 @@ use near_async::messaging::{ }; use near_async::time::{Clock, Duration, Instant, Utc}; use near_chain::rayon_spawner::RayonAsyncComputationSpawner; +use near_chain::resharding::resharding_actor::ReshardingActor; +use near_chain::resharding::types::ReshardingSender; use near_chain::state_snapshot_actor::SnapshotCallbacks; use near_chain::test_utils::{KeyValueRuntime, MockEpochManager, ValidatorSchedule}; use near_chain::types::{ChainConfig, RuntimeAdapter}; @@ -171,6 +173,9 @@ pub fn setup( )); let partial_witness_adapter = partial_witness_addr.with_auto_span_context(); + let (resharding_sender_addr, _) = spawn_actix_actor(ReshardingActor::new()); + let resharding_sender = resharding_sender_addr.with_auto_span_context(); + let shards_manager_adapter_for_client = LateBoundSender::new(); let StartClientResult { client_actor, .. } = start_client( clock, @@ -193,6 +198,7 @@ pub fn setup( partial_witness_adapter.clone().into_multi_sender(), enable_doomslug, Some(TEST_SEED), + resharding_sender.into_multi_sender(), ); let validator_signer = Some(Arc::new(EmptyValidatorSigner::new(account_id))); let (shards_manager_addr, _) = start_shards_manager( @@ -273,6 +279,7 @@ pub fn setup_only_view( None, Arc::new(RayonAsyncComputationSpawner), MutableConfigValue::new(None, "validator_signer"), + noop().into_multi_sender(), ) .unwrap(); @@ -1026,6 +1033,7 @@ pub fn setup_client_with_runtime( snapshot_callbacks: Option, partial_witness_adapter: PartialWitnessSenderForClient, validator_signer: Arc, + resharding_sender: ReshardingSender, ) -> Client { let mut config = ClientConfig::test(true, 10, 20, num_validator_seats, archive, save_trie_changes, true); @@ -1051,6 +1059,7 @@ pub fn setup_client_with_runtime( snapshot_callbacks, Arc::new(RayonAsyncComputationSpawner), partial_witness_adapter, + resharding_sender, Arc::new(ActixFutureSpawner), noop().into_multi_sender(), // state sync ignored for these tests ) @@ -1093,6 +1102,7 @@ pub fn setup_synchronous_shards_manager( None, Arc::new(RayonAsyncComputationSpawner), MutableConfigValue::new(None, "validator_signer"), + noop().into_multi_sender(), ) .unwrap(); let chain_head = chain.head().unwrap(); diff --git a/chain/client/src/test_utils/test_env.rs b/chain/client/src/test_utils/test_env.rs index 770d8f65c15..ee3397c686c 100644 --- a/chain/client/src/test_utils/test_env.rs +++ b/chain/client/src/test_utils/test_env.rs @@ -644,6 +644,7 @@ impl TestEnv { None, self.clients[idx].partial_witness_adapter.clone(), self.clients[idx].validator_signer.get().unwrap(), + self.clients[idx].resharding_sender.clone(), ) } diff --git a/chain/client/src/test_utils/test_env_builder.rs b/chain/client/src/test_utils/test_env_builder.rs index d3880070c23..22c0176f45a 100644 --- a/chain/client/src/test_utils/test_env_builder.rs +++ b/chain/client/src/test_utils/test_env_builder.rs @@ -4,7 +4,7 @@ use super::test_env::TestEnv; use super::{AccountIndices, TEST_SEED}; use actix_rt::System; use itertools::{multizip, Itertools}; -use near_async::messaging::{IntoMultiSender, IntoSender}; +use near_async::messaging::{noop, IntoMultiSender, IntoSender}; use near_async::time::Clock; use near_chain::state_snapshot_actor::SnapshotCallbacks; use near_chain::test_utils::{KeyValueRuntime, MockEpochManager, ValidatorSchedule}; @@ -619,6 +619,7 @@ impl TestEnvBuilder { Some(snapshot_callbacks), partial_witness_adapter.into_multi_sender(), validator_signer, + noop().into_multi_sender(), ) }) .collect(); diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index db8ad582f32..e08a38996e8 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -211,7 +211,7 @@ impl Default for EpochSyncConfig { // A handle that allows the main process to interrupt resharding if needed. // This typically happens when the main process is interrupted. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ReshardingHandle { keep_going: Arc, } diff --git a/core/store/src/flat/types.rs b/core/store/src/flat/types.rs index 0dd2a5ff933..cf983e1446d 100644 --- a/core/store/src/flat/types.rs +++ b/core/store/src/flat/types.rs @@ -186,7 +186,7 @@ pub struct SplittingParentStatus { pub right_child_shard: ShardUId, /// The new shard layout. pub shard_layout: ShardLayout, - /// Hash of the first block having the new shard layout. + /// Hash of the last block having the old shard layout. pub block_hash: CryptoHash, /// The block before `block_hash`. pub prev_block_hash: CryptoHash, diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index 810112eee6c..7f333da676a 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -41,6 +41,7 @@ use tempfile::TempDir; use super::env::{ClientToShardsManagerSender, TestData, TestLoopChunksStorage, TestLoopEnv}; use super::utils::network::{chunk_endorsement_dropper, partial_encoded_chunks_dropper}; +use near_chain::resharding::resharding_actor::ReshardingActor; pub(crate) struct TestLoopBuilder { test_loop: TestLoopV2, @@ -244,6 +245,7 @@ impl TestLoopBuilder { let state_snapshot_adapter = LateBoundSender::new(); let partial_witness_adapter = LateBoundSender::new(); let sync_jobs_adapter = LateBoundSender::new(); + let resharding_sender = LateBoundSender::new(); let genesis = self.genesis.as_ref().unwrap(); let epoch_config_store = self.epoch_config_store.as_ref().unwrap(); @@ -388,6 +390,7 @@ impl TestLoopBuilder { Some(snapshot_callbacks), Arc::new(self.test_loop.async_computation_spawner(|_| Duration::milliseconds(80))), partial_witness_adapter.as_multi_sender(), + resharding_sender.as_multi_sender(), Arc::new(self.test_loop.future_spawner()), client_adapter.as_multi_sender(), ) @@ -482,6 +485,8 @@ impl TestLoopBuilder { // We don't send messages to `GCActor` so adapter is not needed. self.test_loop.register_actor_for_index(idx, gc_actor, None); + let resharding_actor = ReshardingActor::new(); + let future_spawner = self.test_loop.future_spawner(); let state_sync_dumper = StateSyncDumper { clock: self.test_loop.clock(), @@ -515,6 +520,7 @@ impl TestLoopBuilder { ); self.test_loop.register_actor_for_index(idx, sync_jobs_actor, Some(sync_jobs_adapter)); self.test_loop.register_actor_for_index(idx, state_snapshot, Some(state_snapshot_adapter)); + self.test_loop.register_actor_for_index(idx, resharding_actor, Some(resharding_sender)); // State sync dumper is not an Actor, handle starting separately. let state_sync_dumper_handle_clone = state_sync_dumper_handle.clone(); diff --git a/integration-tests/src/test_loop/tests/simple_test_loop_example.rs b/integration-tests/src/test_loop/tests/simple_test_loop_example.rs index 41bcf7c5ef6..147f85584cb 100644 --- a/integration-tests/src/test_loop/tests/simple_test_loop_example.rs +++ b/integration-tests/src/test_loop/tests/simple_test_loop_example.rs @@ -105,6 +105,7 @@ fn test_client_with_simple_test_loop() { None, Arc::new(test_loop.async_computation_spawner(|_| Duration::milliseconds(80))), noop().into_multi_sender(), + noop().into_multi_sender(), Arc::new(test_loop.future_spawner()), noop().into_multi_sender(), ) diff --git a/integration-tests/src/tests/genesis_helpers.rs b/integration-tests/src/tests/genesis_helpers.rs index 7f109ad33c2..7a6a9f6c9af 100644 --- a/integration-tests/src/tests/genesis_helpers.rs +++ b/integration-tests/src/tests/genesis_helpers.rs @@ -1,3 +1,4 @@ +use near_async::messaging::{noop, IntoMultiSender}; use near_async::time::Clock; use near_chain::rayon_spawner::RayonAsyncComputationSpawner; use near_chain::types::ChainConfig; @@ -39,6 +40,7 @@ fn genesis_header(genesis: &Genesis) -> BlockHeader { None, Arc::new(RayonAsyncComputationSpawner), MutableConfigValue::new(None, "validator_signer"), + noop().into_multi_sender(), ) .unwrap(); chain.genesis().clone() @@ -65,6 +67,7 @@ pub fn genesis_block(genesis: &Genesis) -> Block { None, Arc::new(RayonAsyncComputationSpawner), MutableConfigValue::new(None, "validator_signer"), + noop().into_multi_sender(), ) .unwrap(); chain.get_block(&chain.genesis().hash().clone()).unwrap() diff --git a/integration-tests/src/tests/network/runner.rs b/integration-tests/src/tests/network/runner.rs index 8980e0492ef..97249322fa6 100644 --- a/integration-tests/src/tests/network/runner.rs +++ b/integration-tests/src/tests/network/runner.rs @@ -120,6 +120,7 @@ fn setup_network_node( noop().into_multi_sender(), true, None, + noop().into_multi_sender(), ) .client_actor; let view_client_addr = ViewClientActorInner::spawn_actix_actor( diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index cb96927d264..a0025d172f2 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -15,6 +15,7 @@ use near_async::actix_wrapper::{spawn_actix_actor, ActixWrapper}; use near_async::futures::TokioRuntimeFutureSpawner; use near_async::messaging::{IntoMultiSender, IntoSender, LateBoundSender}; use near_async::time::{self, Clock}; +use near_chain::resharding::resharding_actor::ReshardingActor; pub use near_chain::runtime::NightshadeRuntime; use near_chain::state_snapshot_actor::{ get_delete_snapshot_callback, get_make_snapshot_callback, SnapshotCallbacks, StateSnapshotActor, @@ -384,6 +385,8 @@ pub fn start_with_config_and_synchronization( config.client_config.archive, )); + let (resharding_sender_addr, _) = spawn_actix_actor(ReshardingActor::new()); + let resharding_sender = resharding_sender_addr.with_auto_span_context(); let state_sync_runtime = Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()); @@ -408,6 +411,7 @@ pub fn start_with_config_and_synchronization( partial_witness_actor.clone().with_auto_span_context().into_multi_sender(), true, None, + resharding_sender.into_multi_sender(), ); if let SyncConfig::Peers = config.client_config.state_sync.sync { client_adapter_for_sync.bind(client_actor.clone().with_auto_span_context()) diff --git a/tools/database/src/resharding_v2.rs b/tools/database/src/resharding_v2.rs index 9b40ca6ed52..ec0145c8293 100644 --- a/tools/database/src/resharding_v2.rs +++ b/tools/database/src/resharding_v2.rs @@ -2,6 +2,7 @@ use core::time; use std::path::{Path, PathBuf}; use std::sync::Arc; +use near_async::messaging::{noop, IntoMultiSender}; use near_async::time::Clock; use near_chain::rayon_spawner::RayonAsyncComputationSpawner; use near_chain::resharding::v2::ReshardingResponse; @@ -160,6 +161,8 @@ impl ReshardingV2Command { None, Arc::new(RayonAsyncComputationSpawner), MutableConfigValue::new(None, "validator_signer"), + // Resharding sender is not used in resharding-v2. + noop().into_multi_sender(), ) .unwrap(); Ok(chain) diff --git a/tools/speedy_sync/Cargo.toml b/tools/speedy_sync/Cargo.toml index 864dc208844..a464b098f23 100644 --- a/tools/speedy_sync/Cargo.toml +++ b/tools/speedy_sync/Cargo.toml @@ -20,6 +20,7 @@ nearcore.workspace = true near-chain-configs.workspace = true near-chain.workspace = true near-epoch-manager.workspace = true +near-async.workspace = true borsh.workspace = true serde.workspace = true diff --git a/tools/speedy_sync/src/main.rs b/tools/speedy_sync/src/main.rs index 8b839bc9116..046df49d5ba 100644 --- a/tools/speedy_sync/src/main.rs +++ b/tools/speedy_sync/src/main.rs @@ -1,4 +1,5 @@ use borsh::{BorshDeserialize, BorshSerialize}; +use near_async::messaging::{noop, IntoMultiSender}; use near_chain::rayon_spawner::RayonAsyncComputationSpawner; use near_chain::types::{ChainConfig, Tip}; use near_chain::{Chain, ChainGenesis, DoomslugThresholdMode}; @@ -256,6 +257,7 @@ fn load_snapshot(load_cmd: LoadCmd) { None, Arc::new(RayonAsyncComputationSpawner), MutableConfigValue::new(None, "validator_signer"), + noop().into_multi_sender(), ) .unwrap();