-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CATCHUP] Repro Restart Bug + Fix #3686
Changes from all commits
299b8b1
8991b9c
1313952
8848bb7
ec529cd
c3cab62
896ff21
3f2506a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,10 +10,13 @@ use std::{ | |
}; | ||
|
||
use anyhow::Result; | ||
use async_broadcast::broadcast; | ||
use async_lock::RwLock; | ||
use async_trait::async_trait; | ||
use futures::future::join_all; | ||
use hotshot::{traits::TestableNodeImplementation, types::EventType, HotShotInitializer}; | ||
use hotshot::{ | ||
traits::TestableNodeImplementation, types::EventType, HotShotInitializer, SystemContext, | ||
}; | ||
use hotshot_example_types::{ | ||
auction_results_provider_types::TestAuctionResultsProvider, | ||
block_types::TestBlockHeader, | ||
|
@@ -22,6 +25,7 @@ use hotshot_example_types::{ | |
testable_delay::DelayConfig, | ||
}; | ||
use hotshot_types::{ | ||
constants::EVENT_CHANNEL_SIZE, | ||
data::Leaf, | ||
event::Event, | ||
simple_certificate::QuorumCertificate, | ||
|
@@ -47,7 +51,12 @@ pub type StateAndBlock<S, B> = (Vec<S>, Vec<B>); | |
pub struct SpinningTaskErr {} | ||
|
||
/// Spinning task state | ||
pub struct SpinningTask<TYPES: NodeType, I: TestableNodeImplementation<TYPES>, V: Versions> { | ||
pub struct SpinningTask< | ||
TYPES: NodeType, | ||
N: ConnectedNetwork<TYPES::SignatureKey>, | ||
I: TestableNodeImplementation<TYPES>, | ||
V: Versions, | ||
> { | ||
/// handle to the nodes | ||
pub(crate) handles: Arc<RwLock<Vec<Node<TYPES, I, V>>>>, | ||
/// late start nodes | ||
|
@@ -62,6 +71,8 @@ pub struct SpinningTask<TYPES: NodeType, I: TestableNodeImplementation<TYPES>, V | |
pub(crate) high_qc: QuorumCertificate<TYPES>, | ||
/// Add specified delay to async calls | ||
pub(crate) async_delay_config: DelayConfig, | ||
/// Context stored for nodes to be restarted with | ||
pub(crate) restart_contexts: HashMap<usize, RestartContext<TYPES, N, I, V>>, | ||
} | ||
|
||
#[async_trait] | ||
|
@@ -74,7 +85,7 @@ impl< | |
I: TestableNodeImplementation<TYPES>, | ||
N: ConnectedNetwork<TYPES::SignatureKey>, | ||
V: Versions, | ||
> TestTaskState for SpinningTask<TYPES, I, V> | ||
> TestTaskState for SpinningTask<TYPES, N, I, V> | ||
where | ||
I: TestableNodeImplementation<TYPES>, | ||
I: NodeImplementation< | ||
|
@@ -117,7 +128,7 @@ where | |
if let Some(operations) = self.changes.remove(&view_number) { | ||
for ChangeNode { idx, updown } in operations { | ||
match updown { | ||
UpDown::Up => { | ||
NodeAction::Up => { | ||
let node_id = idx.try_into().unwrap(); | ||
if let Some(node) = self.late_start.remove(&node_id) { | ||
tracing::error!("Node {} spinning up late", idx); | ||
|
@@ -187,13 +198,13 @@ where | |
self.handles.write().await.push(node); | ||
} | ||
} | ||
UpDown::Down => { | ||
NodeAction::Down => { | ||
if let Some(node) = self.handles.write().await.get_mut(idx) { | ||
tracing::error!("Node {} shutting down", idx); | ||
node.handle.shut_down().await; | ||
} | ||
} | ||
UpDown::Restart => { | ||
NodeAction::RestartDown(delay_views) => { | ||
let node_id = idx.try_into().unwrap(); | ||
if let Some(node) = self.handles.write().await.get_mut(idx) { | ||
tracing::error!("Node {} shutting down", idx); | ||
|
@@ -217,7 +228,7 @@ where | |
self.last_decided_leaf.clone(), | ||
TestInstanceState::new(self.async_delay_config.clone()), | ||
None, | ||
view_number, | ||
read_storage.last_actioned_view().await, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this equivalent to what we do in the sequencer? Do we use the last actioned view or something else? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be equivalent, the sequencer only has access to storage and has a very similar function to get the view number |
||
read_storage.last_actioned_view().await, | ||
read_storage.proposals_cloned().await, | ||
read_storage.high_qc_cloned().await.unwrap_or( | ||
|
@@ -238,6 +249,7 @@ where | |
// For tests, make the node DA based on its index | ||
node_id < config.da_staked_committee_size as u64, | ||
); | ||
let internal_chan = broadcast(EVENT_CHANNEL_SIZE); | ||
let context = | ||
TestRunner::<TYPES, I, V, N>::add_node_with_config_and_channels( | ||
node_id, | ||
|
@@ -248,27 +260,44 @@ where | |
validator_config, | ||
(*read_storage).clone(), | ||
marketplace_config.clone(), | ||
( | ||
node.handle.internal_channel_sender(), | ||
node.handle.internal_event_stream_receiver_known_impl(), | ||
), | ||
internal_chan, | ||
( | ||
node.handle.external_channel_sender(), | ||
node.handle.event_stream_known_impl(), | ||
node.handle.event_stream_known_impl().new_receiver(), | ||
), | ||
) | ||
.await; | ||
new_nodes.push((context, idx)); | ||
new_networks.push(network.clone()); | ||
if delay_views == 0 { | ||
new_nodes.push((context, idx)); | ||
new_networks.push(network.clone()); | ||
} else { | ||
let up_view = view_number + delay_views; | ||
let change = ChangeNode { | ||
idx, | ||
updown: NodeAction::RestartUp, | ||
}; | ||
self.changes.entry(up_view).or_default().push(change); | ||
let new_ctx = RestartContext { | ||
context, | ||
network: network.clone(), | ||
}; | ||
self.restart_contexts.insert(idx, new_ctx); | ||
} | ||
} | ||
} | ||
UpDown::NetworkUp => { | ||
NodeAction::RestartUp => { | ||
if let Some(ctx) = self.restart_contexts.remove(&idx) { | ||
new_nodes.push((ctx.context, idx)); | ||
new_networks.push(ctx.network.clone()); | ||
} | ||
} | ||
NodeAction::NetworkUp => { | ||
if let Some(handle) = self.handles.write().await.get(idx) { | ||
tracing::error!("Node {} networks resuming", idx); | ||
handle.network.resume(); | ||
} | ||
} | ||
UpDown::NetworkDown => { | ||
NodeAction::NetworkDown => { | ||
if let Some(handle) = self.handles.write().await.get(idx) { | ||
tracing::error!("Node {} networks pausing", idx); | ||
handle.network.pause(); | ||
|
@@ -286,6 +315,7 @@ where | |
join_all(ready_futs).await; | ||
|
||
while let Some((node, id)) = new_nodes.pop() { | ||
tracing::error!("Starting node {} back up", id); | ||
let handle = node.run_tasks().await; | ||
|
||
// Create the node and add it to the state, so we can shut them | ||
|
@@ -312,9 +342,20 @@ where | |
} | ||
} | ||
|
||
#[derive(Clone)] | ||
pub(crate) struct RestartContext< | ||
TYPES: NodeType, | ||
N: ConnectedNetwork<TYPES::SignatureKey>, | ||
I: TestableNodeImplementation<TYPES>, | ||
V: Versions, | ||
> { | ||
context: Arc<SystemContext<TYPES, I, V>>, | ||
network: Arc<N>, | ||
} | ||
|
||
/// Spin the node up or down | ||
#[derive(Clone, Debug)] | ||
pub enum UpDown { | ||
pub enum NodeAction { | ||
/// spin the node up | ||
Up, | ||
/// spin the node down | ||
|
@@ -323,8 +364,11 @@ pub enum UpDown { | |
NetworkUp, | ||
/// spin the node's network down | ||
NetworkDown, | ||
/// restart the node | ||
Restart, | ||
/// Take a node down to be restarted after a number of views | ||
RestartDown(u64), | ||
/// Start a node up again after it's been shutdown for restart. This | ||
bfish713 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// should only be created following a `ResartDown` | ||
RestartUp, | ||
} | ||
|
||
/// denotes a change in node state | ||
|
@@ -333,7 +377,7 @@ pub struct ChangeNode { | |
/// the index of the node | ||
pub idx: usize, | ||
/// spin the node or node's network up or down | ||
pub updown: UpDown, | ||
pub updown: NodeAction, | ||
} | ||
|
||
/// description of the spinning task | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked over the
update_action
logic and this looks good 👍