-
Notifications
You must be signed in to change notification settings - Fork 622
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(resharding): introduce resharding actor #12217
feat(resharding): introduce resharding actor #12217
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -361,7 +362,9 @@ impl Chain { | |||
let resharding_manager = ReshardingManager::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even need resharding manager in the view client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason. I'll make a note to ask @shreyan-gupta and then remove the manager from there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be needed, but we would have to make it Option then?
@@ -52,13 +54,24 @@ use near_store::{ShardUId, StorageError}; | |||
pub struct FlatStorageResharder { | |||
runtime: Arc<dyn RuntimeAdapter>, | |||
resharding_event: Arc<Mutex<Option<FlatStorageReshardingEventStatus>>>, | |||
scheduler: messaging::Sender<FlatStorageSplitShardRequest>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe import messaging? I don't think there is any ambiguity about it.
/// * `runtime`: runtime adapter | ||
/// * `scheduler`: component used to schedule the background tasks | ||
/// * `controller`: manages the execution of the background tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's pretty!
fn split_shard_task(resharder: FlatStorageResharder, controller: FlatStorageResharderController) { | ||
let task_status = split_shard_task_impl(resharder.clone(), controller.clone()); | ||
pub fn split_shard_task(resharder: FlatStorageResharder) { | ||
let task_status = split_shard_task_impl(resharder.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sanity check: What's up with the clone
s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, it's not really needed anymore. Reference is enough.
@@ -501,7 +501,7 @@ pub enum FlatStorageReshardingTaskStatus { | |||
|
|||
/// Helps control the flat storage resharder operation. More specifically, | |||
/// it has a way to know when the background task is done or to interrupt it. | |||
#[derive(Clone)] | |||
#[derive(Clone, Debug)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm surprised that sender and receiver implement Debug but alright.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caught me by surprise as well
} | ||
|
||
impl ReshardingManager { | ||
pub fn new( | ||
store: Store, | ||
epoch_manager: Arc<dyn EpochManagerAdapter>, | ||
runtime_adapter: Arc<dyn RuntimeAdapter>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my education, why do you need the runtime adapter in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that now FlatStorageResharder
is owned and created by ReshardingManager
. runtime
is needed by FlatStorageResharder
ReshardingEventType::SplitShard(split_shard_event.clone()), | ||
&next_shard_layout, | ||
)?, | ||
None => tracing::info!(target: "resharding", "flat storage resharder not initialized"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should that be tracing::error?
} | ||
|
||
pub fn handle_flat_storage_split_shard_request(&mut self, msg: FlatStorageSplitShardRequest) { | ||
split_shard_task(msg.resharder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Would it be any nicer as a method? msg.resharder.split_shard_task()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I did the change and it looks nicer to me
|
||
#[derive(actix::Message, Clone, Debug)] | ||
#[rtype(result = "()")] | ||
pub struct FlatStorageSplitShardRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add comments to the pub structs?
|
||
#[derive(Clone, near_async::MultiSend, near_async::MultiSenderFrom)] | ||
pub struct ReshardingSender { | ||
pub flat_storage_split_shard_send: Sender<FlatStorageSplitShardRequest>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future we'd want a sender for any type of resharding request and SplitShard would be one of the variants in the Request enum. No need to do it now though.
ReshardingEventType::SplitShard(split_shard_event.clone()), | ||
&next_shard_layout, | ||
)?, | ||
None => tracing::error!(target: "resharding", "flat storage resharder not initialized"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should not we panic if the resharder is not initialized? or it is based on some protocol versioning? if so, it should not error message right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Converting resharding sender to non-option should remove this case.
@@ -609,7 +605,10 @@ mod tests { | |||
} | |||
|
|||
/// Generic test setup. | |||
fn create_fs_resharder(shard_layout: ShardLayout) -> (Chain, FlatStorageResharder) { | |||
fn create_fs_resharder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. what does fs stand for here? if flat_storage, can we expand the name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it stands for flat storage yes
function name aged quite badly, I'll change it!
@@ -23,15 +27,27 @@ pub struct ReshardingManager { | |||
/// A handle that allows the main process to interrupt resharding if needed. | |||
/// This typically happens when the main process is interrupted. | |||
pub resharding_handle: ReshardingHandle, | |||
/// Takes care of performing resharding on the flat storage. | |||
pub flat_storage_resharder: Option<FlatStorageResharder>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer having this non-optional (maybe add a TODO as it is now convenient for this PR) in the future since as far as I know, the resharding will fail if this is not initialized yet? or can it still succeed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would too prefer to have this non-optional. I think we should address this as part of this PR itself. For the places where we don't have a sender or don't need to pass a sender, we can just pass in noop() sender.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to remove the Option and pass a noop()!
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #12217 +/- ##
=======================================
Coverage 71.72% 71.72%
=======================================
Files 833 835 +2
Lines 166700 166684 -16
Branches 166700 166684 -16
=======================================
- Hits 119573 119562 -11
+ Misses 41902 41901 -1
+ Partials 5225 5221 -4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall but have a couple of comments
let resharder = self.clone(); | ||
let task = Box::new(move || split_shard_task(resharder, controller)); | ||
scheduler.schedule(task); | ||
self.scheduler.send(FlatStorageSplitShardRequest { resharder }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not look right, why are we sending an instance of resharder along with request? Can the actor hold an instance of resharder as a member instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should be easy to make the actor hold copy of resharder
split_shard_request: Arc<RwLock<VecDeque<FlatStorageSplitShardRequest>>>, | ||
} | ||
|
||
impl CanSend<FlatStorageSplitShardRequest> for MockReshardingSender { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the old framework for tests, before test loop. Let's try to keep it as lean as possible and not introduce any new functionality here? It would be really really really really really really great it we can just have a noop sender and kill all this code and just say we don't support resharding for old framework.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to kill this code and replace with the least amount of code just to make stuff compile
@@ -23,15 +27,27 @@ pub struct ReshardingManager { | |||
/// A handle that allows the main process to interrupt resharding if needed. | |||
/// This typically happens when the main process is interrupted. | |||
pub resharding_handle: ReshardingHandle, | |||
/// Takes care of performing resharding on the flat storage. | |||
pub flat_storage_resharder: Option<FlatStorageResharder>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would too prefer to have this non-optional. I think we should address this as part of this PR itself. For the places where we don't have a sender or don't need to pass a sender, we can just pass in noop() sender.
ReshardingEventType::SplitShard(split_shard_event.clone()), | ||
&next_shard_layout, | ||
)?, | ||
None => tracing::error!(target: "resharding", "flat storage resharder not initialized"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Converting resharding sender to non-option should remove this case.
} | ||
|
||
pub fn handle_flat_storage_split_shard_request(&mut self, msg: FlatStorageSplitShardRequest) { | ||
msg.resharder.split_shard_task(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm honestly not too happy that we are calling a function from resharder here.
For the purposes of reducing mental burden of identifying what happens where, ideally all the logic to deal with the heavy task of splitting should have been a part of the ReshardingActor class. This is too much of indirection to deal with.
Is it possible for us to push the logic of splitting the flat storage here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the general sentiment was to keep the actor as simple as possible, thus all the work and unit testing is done in the resharder. Any other thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would love input from other folks, @wacban @Longarithm!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have strong feelings either way. My only recommendation would be to keep the actor api part (typically the handle method) as slim as possible. This way we can properly unit test the actual implementation without needed to spin up the actor or worry about async. So something like this:
fn handle(request) {
the_actual_implementation(request);
}
mod tests {
// here the the_actual_implementation can be unit tested without needing to bootstrap an entire actor
@@ -401,6 +405,7 @@ pub fn start_with_config_and_synchronization( | |||
partial_witness_actor.clone().with_auto_span_context().into_multi_sender(), | |||
true, | |||
None, | |||
resharding_sender.into_multi_sender(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh, we need a better way to handle adding new actors and new actix requests. Note to self; I'll see if I can come up with better ways of handling all actor senders in a single unified struct.
FlatStorageResharder::new( | ||
runtime_adapter, | ||
sender.into_sender(), | ||
FlatStorageResharderController::from_resharding_handle(resharding_handle.clone()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still unsure how does the controller work and what do we do with the sender and receiver here? I'll have to go back and do a quick read up on how crossbeam_channels work but we should ideally not be introducing a new framework here given we have actors and message senders? Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I planned to remove the channels in another PR because I thought they could come in handy but they did not. I can push the change here
I've addressed most of the comments! I left out two things:
I plan to do further refactoring and improvements in separate PRs |
Main changes introduced by this PR:
FlatStorageResharder
resume
andstart
in all code paths.ReshardingActor
and related Sender/Request items. Unfortunately, this meant propagating an extra argument throughClient
andChain
.FlatStorageResharder
inReshardingManager
. Wrapped insideOption
because it makes initialization ofChain
easier, and it's not needed in many tests anyway.FlatStorageResharder
to work with aSender
. Functionality is the same as before.