-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dag] dag rebootstrap #9967
[dag] dag rebootstrap #9967
Conversation
Current dependencies on/for this PR:
This comment was auto-generated by Graphite. |
7e41139
to
3932158
Compare
fab0ad8
to
6dd6796
Compare
b00b863
to
017824b
Compare
consensus/src/dag/bootstrap.rs
Outdated
error!(error = ?e, "unable to sync"); | ||
} | ||
}, | ||
_ = handler.start(&mut dag_rpc_rx) => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this works? once it starts, it'll never go out from the inner loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it works. as long as we have an .await
within the loop, the future should stop polling and yield to another future in the outer select!. If we get rebootstrap notification, it will complete the future and then we will simply not poll handler again. instead, we will start a new handler.
}, | ||
Some(node) = rebootstrap_notification_rx.recv() => { | ||
df_handle.abort(); | ||
let _ = df_handle.await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably should have a guard for these services
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking at it again, why we need to separate the trigger and sync manager? since we block the handler anyway, why not just check and sync directly inside the handler?
consensus/src/dag/bootstrap.rs
Outdated
AggregateSignature::empty(), | ||
); | ||
|
||
let mut shutdown_rx = shutdown_rx.into_stream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need this, just &mut shutdown_rx is enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need this btw, because the oneshot::Receiver seems moved within the select statement.
89c4dec
to
1db0e3f
Compare
017824b
to
60eace0
Compare
1db0e3f
to
a2a16b5
Compare
I think I need to abort the fetch service before starting an ad-hoc fetch for state sync. otherwise, there could be two fetches for same nodes happening concurrently? |
60eace0
to
c1842f6
Compare
c1842f6
to
a8e5453
Compare
a8e5453
to
c921272
Compare
|
||
let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone()); | ||
|
||
if let Err(e) = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone()).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i thought we're creating a new dag store instead of re-using the current one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we are. I pass the existing dag store to do some assertion checks on whether to actually state sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, that sounds weird, the check should be done in the check function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, i am being paranoid. i check in the check function and i assert in the sync_to function.
let (handler, fetch_service) = | ||
self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); | ||
|
||
let df_handle = tokio::spawn(fetch_service.start()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can just have a drop guard like this created and avoid the abort/await lines in both branches?
pub struct DropGuard { |
match certified_node_msg.verify(&self.epoch_state.verifier) { | ||
Ok(_) => match self.state_sync_trigger.check(certified_node_msg).await { | ||
ret @ (NeedsSync(_), None) => return Ok(ret.0), | ||
(Synced, Some(certified_node_msg)) => self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the message can be carried in the StateSyncStatus::Synced to avoid the second Option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to send the Synced status from process_rpc fn as well, so I either clone the ceritifed_node_msg or use the second option.
Alternatively, I could have two enums, state sync check return one enum and I can convert it to another one for process_rpc. i thought it was too much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to define two enum, this function can just return a Result<(), CertifiedNodeMessage>?
c921272
to
51158fc
Compare
match certified_node_msg.verify(&self.epoch_state.verifier) { | ||
Ok(_) => match self.state_sync_trigger.check(certified_node_msg).await { | ||
ret @ (NeedsSync(_), None) => return Ok(ret.0), | ||
(Synced, Some(certified_node_msg)) => self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to define two enum, this function can just return a Result<(), CertifiedNodeMessage>?
|
||
let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone()); | ||
|
||
if let Err(e) = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone()).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, that sounds weird, the check should be done in the check function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
consensus/src/dag/dag_state_sync.rs
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we now ready to increase the dag_window.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, when we introduce the onchain config.
dag.clone(), | ||
self.time_service.clone(), | ||
); | ||
let fetch_requester = Arc::new(fetch_requester); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why cannot you return arc from DagFetcherService::new?
c41b645
to
d85cfd6
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
✅ Forge suite
|
✅ Forge suite
|
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
✅ Forge suite
|
Description
This PR introduces the rebootstrap logic for the DAG. Essentially, when there is a need to state sync the DAG, we abort the existing handlers, return to the bootstrapper and let the bootstrapper state sync the DAG and recreate all the components and start the handlers again. This is to unify the logic such that we use the same bootstraping logic for recovery as well as state sync.
Test Plan
Existing Tests