Skip to content

Commit

Permalink
feat(resharding): introduce resharding actor (#12217)
Browse files Browse the repository at this point in the history
Main changes introduced by this PR:
- Calling `FlatStorageResharder` `resume` and `start` in all code paths.
- Added a very thin `ReshardingActor` and related Sender/Request items.
Unfortunately, this meant propagating an extra argument through `Client`
and `Chain`.
- Moved `FlatStorageResharder` in `ReshardingManager`. Wrapped inside
`Option` because it makes initialization of `Chain` easier, and it's not
needed in many tests anyway.
- Refactored `FlatStorageResharder` to work with a `Sender`.
Functionality is the same as before.
- A lot of uninteresting changes just to make old tests compile.
  • Loading branch information
Trisfald authored Oct 16, 2024
1 parent 3cb74c2 commit aa48e52
Show file tree
Hide file tree
Showing 28 changed files with 384 additions and 335 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::missing_chunks::MissingChunksPool;
use crate::orphan::{Orphan, OrphanBlockPool};
use crate::rayon_spawner::RayonAsyncComputationSpawner;
use crate::resharding::manager::ReshardingManager;
use crate::resharding::types::ReshardingSender;
use crate::sharding::shuffle_receipt_proofs;
use crate::state_request_tracker::StateRequestTracker;
use crate::state_snapshot_actor::SnapshotCallbacks;
Expand Down Expand Up @@ -39,6 +40,7 @@ use crossbeam_channel::{unbounded, Receiver, Sender};
use itertools::Itertools;
use lru::LruCache;
use near_async::futures::{AsyncComputationSpawner, AsyncComputationSpawnerExt};
use near_async::messaging::{noop, IntoMultiSender};
use near_async::time::{Clock, Duration, Instant};
use near_chain_configs::{MutableConfigValue, MutableValidatorSigner};
use near_chain_primitives::error::{BlockKnownError, Error, LogTransientStorageError};
Expand Down Expand Up @@ -361,7 +363,9 @@ impl Chain {
let resharding_manager = ReshardingManager::new(
store.clone(),
epoch_manager.clone(),
runtime_adapter.clone(),
MutableConfigValue::new(Default::default(), "resharding_config"),
noop().into_multi_sender(),
);
Ok(Chain {
clock: clock.clone(),
Expand Down Expand Up @@ -401,6 +405,7 @@ impl Chain {
snapshot_callbacks: Option<SnapshotCallbacks>,
apply_chunks_spawner: Arc<dyn AsyncComputationSpawner>,
validator: MutableValidatorSigner,
resharding_sender: ReshardingSender,
) -> Result<Chain, Error> {
let state_roots = get_genesis_state_roots(runtime_adapter.store())?
.expect("genesis should be initialized.");
Expand Down Expand Up @@ -537,7 +542,9 @@ impl Chain {
let resharding_manager = ReshardingManager::new(
chain_store.store().clone(),
epoch_manager.clone(),
runtime_adapter.clone(),
chain_config.resharding_config,
resharding_sender,
);
Ok(Chain {
clock: clock.clone(),
Expand Down
7 changes: 3 additions & 4 deletions chain/chain/src/flat_storage_creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ impl FlatStorageCreator {
epoch_manager: &Arc<dyn EpochManagerAdapter>,
flat_storage_manager: &FlatStorageManager,
runtime: &Arc<dyn RuntimeAdapter>,
_flat_storage_resharder: &FlatStorageResharder,
flat_storage_resharder: &FlatStorageResharder,
) -> Result<HashMap<ShardUId, FlatStorageShardCreator>, Error> {
let epoch_id = &chain_head.epoch_id;
tracing::debug!(target: "store", ?epoch_id, "creating flat storage for the current epoch");
Expand All @@ -486,9 +486,8 @@ impl FlatStorageCreator {
);
}
FlatStorageStatus::Disabled => {}
FlatStorageStatus::Resharding(_status) => {
// TODO(Trisfald): call resume
// flat_storage_resharder.resume(shard_uid, &status, ...)?;
FlatStorageStatus::Resharding(status) => {
flat_storage_resharder.resume(shard_uid, &status)?;
}
}
}
Expand Down
Loading

0 comments on commit aa48e52

Please sign in to comment.