Skip to content

Commit

Permalink
[CATCHUP] - Support and test restarting without losing state (#2612)
Browse files Browse the repository at this point in the history
* Add view to Initializer and update SystemContext initialization

* Update test

* Move late_decided_leaf and fix build

* Add test

* Replace genesis function

* Fix genesis

* Fix genesis state

* try logging

* remove logging

* Undo justfile change

* Fix build

* Fix param after merge

* Fix build

* Rename and remove incremental fu

* Fix build after merge

* Add skip_late to test metadata, fix cur_view, modify num failures, remove instance_state reference

* Update instead of increment view

* Add view update to more tasks

* Fix next_view for view sync
  • Loading branch information
shenkeyao authored Feb 23, 2024
1 parent f736f98 commit d0a2638
Show file tree
Hide file tree
Showing 19 changed files with 308 additions and 128 deletions.
2 changes: 1 addition & 1 deletion crates/examples/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ pub trait RunDA<
/// get the anchored view
/// Note: sequencing leaf does not have state, so does not return state
async fn initialize_state_and_hotshot(&self) -> SystemContextHandle<TYPES, NODE> {
let initializer = hotshot::HotShotInitializer::<TYPES>::from_genesis(&TestInstanceState {})
let initializer = hotshot::HotShotInitializer::<TYPES>::from_genesis(TestInstanceState {})
.expect("Couldn't generate genesis block");

let config = self.get_config();
Expand Down
52 changes: 34 additions & 18 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> Networks<TYPES, I> {
}

/// Bundle of all the memberships a consensus instance uses
#[derive(Clone)]
pub struct Memberships<TYPES: NodeType> {
/// Quorum Membership
pub quorum_membership: TYPES::Membership,
Expand Down Expand Up @@ -155,8 +156,7 @@ pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>> {
}

impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
/// Creates a new [`Arc<SystemContext>`] with the given configuration options and sets it up with the given
/// genesis block
/// Creates a new [`Arc<SystemContext>`] with the given configuration options.
///
/// To do a full initialization, use `fn init` instead, which will set up background tasks as
/// well.
Expand Down Expand Up @@ -187,7 +187,9 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {

// insert genesis (or latest block) to state map
let mut validated_state_map = BTreeMap::default();
let validated_state = Arc::new(TYPES::ValidatedState::genesis(&instance_state));
let validated_state = Arc::new(TYPES::ValidatedState::from_header(
&anchored_leaf.block_header,
));
validated_state_map.insert(
anchored_leaf.get_view_number(),
View {
Expand All @@ -211,10 +213,12 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
}
};
saved_payloads.insert(anchored_leaf.get_view_number(), encoded_txns.clone());
// View 1 doesn't have DA which is responsible for saving the payloads, so we store the
// payload for view 1 manually during the intialization.
saved_payloads.insert(TYPES::Time::new(1), encoded_txns);
}

let start_view = anchored_leaf.get_view_number();
let start_view = initializer.start_view;

let consensus = Consensus {
instance_state,
Expand Down Expand Up @@ -279,7 +283,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
broadcast_event(event, &self.output_event_stream.0).await;
}

/// Publishes a transaction asynchronously to the network
/// Publishes a transaction asynchronously to the network.
///
/// # Errors
///
Expand All @@ -290,11 +294,12 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
transaction: TYPES::Transaction,
) -> Result<(), HotShotError<TYPES>> {
trace!("Adding transaction to our own queue");
// Wrap up a message
// TODO place a view number here that makes sense
// we haven't worked out how this will work yet
let message = DataMessage::SubmitTransaction(transaction.clone(), TYPES::Time::new(0));

let api = self.clone();
let view_number = api.consensus.read().await.cur_view;

// Wrap up a message
let message = DataMessage::SubmitTransaction(transaction.clone(), view_number);

async_spawn(async move {
let da_membership = &api.memberships.da_membership.clone();
Expand All @@ -312,11 +317,11 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> SystemContext<TYPES, I> {
sender: api.public_key.clone(),
kind: MessageKind::from(message),
},
da_membership.get_committee(TYPES::Time::new(0)),
da_membership.get_committee(view_number),
),
api
.send_external_event(Event {
view_number: api.consensus.read().await.cur_view,
view_number,
event: EventType::Transactions {
transactions: vec![transaction],
},
Expand Down Expand Up @@ -606,26 +611,37 @@ pub struct HotShotInitializer<TYPES: NodeType> {

/// Instance-level state.
instance_state: TYPES::InstanceState,

/// Starting view number that we are confident won't lead to a double vote after restart.
start_view: TYPES::Time,
}

impl<TYPES: NodeType> HotShotInitializer<TYPES> {
/// initialize from genesis
/// # Errors
/// If we are unable to apply the genesis block to the default state
pub fn from_genesis(
instance_state: &TYPES::InstanceState,
) -> Result<Self, HotShotError<TYPES>> {
pub fn from_genesis(instance_state: TYPES::InstanceState) -> Result<Self, HotShotError<TYPES>> {
Ok(Self {
inner: Leaf::genesis(instance_state),
instance_state: instance_state.clone(),
inner: Leaf::genesis(&instance_state),
instance_state,
start_view: TYPES::Time::new(0),
})
}

/// reload previous state based on most recent leaf and the instance-level state.
pub fn from_reload(anchor_leaf: Leaf<TYPES>, instance_state: TYPES::InstanceState) -> Self {
/// Reload previous state based on most recent leaf and the instance-level state.
///
/// # Arguments
/// * `start_view` - The minimum view number that we are confident won't lead to a double vote
/// after restart.
pub fn from_reload(
anchor_leaf: Leaf<TYPES>,
instance_state: TYPES::InstanceState,
start_view: TYPES::Time,
) -> Self {
Self {
inner: anchor_leaf,
instance_state,
start_view,
}
}
}
12 changes: 6 additions & 6 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ pub async fn add_consensus_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
rx: Receiver<HotShotEvent<TYPES>>,
handle: &SystemContextHandle<TYPES, I>,
) {
let consensus_state = ConsensusTaskState::create_from(handle);
let consensus_state = ConsensusTaskState::create_from(handle).await;

inject_consensus_polls(&consensus_state).await;

Expand All @@ -187,7 +187,7 @@ pub async fn add_vid_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
rx: Receiver<HotShotEvent<TYPES>>,
handle: &SystemContextHandle<TYPES, I>,
) {
let vid_state = VIDTaskState::create_from(handle);
let vid_state = VIDTaskState::create_from(handle).await;
let task = Task::new(tx, rx, task_reg.clone(), vid_state);
task_reg.run_task(task).await;
}
Expand All @@ -199,7 +199,7 @@ pub async fn add_upgrade_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
rx: Receiver<HotShotEvent<TYPES>>,
handle: &SystemContextHandle<TYPES, I>,
) {
let upgrade_state = UpgradeTaskState::create_from(handle);
let upgrade_state = UpgradeTaskState::create_from(handle).await;

let task = Task::new(tx, rx, task_reg.clone(), upgrade_state);
task_reg.run_task(task).await;
Expand All @@ -212,7 +212,7 @@ pub async fn add_da_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
handle: &SystemContextHandle<TYPES, I>,
) {
// build the da task
let da_state = DATaskState::create_from(handle);
let da_state = DATaskState::create_from(handle).await;

let task = Task::new(tx, rx, task_reg.clone(), da_state);
task_reg.run_task(task).await;
Expand All @@ -225,7 +225,7 @@ pub async fn add_transaction_task<TYPES: NodeType, I: NodeImplementation<TYPES>>
rx: Receiver<HotShotEvent<TYPES>>,
handle: &SystemContextHandle<TYPES, I>,
) {
let transactions_state = TransactionTaskState::create_from(handle);
let transactions_state = TransactionTaskState::create_from(handle).await;

let task = Task::new(tx, rx, task_reg.clone(), transactions_state);
task_reg.run_task(task).await;
Expand All @@ -238,7 +238,7 @@ pub async fn add_view_sync_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
rx: Receiver<HotShotEvent<TYPES>>,
handle: &SystemContextHandle<TYPES, I>,
) {
let view_sync_state = ViewSyncTaskState::create_from(handle);
let view_sync_state = ViewSyncTaskState::create_from(handle).await;

let task = Task::new(tx, rx, task_reg.clone(), view_sync_state);
task_reg.run_task(task).await;
Expand Down
37 changes: 23 additions & 14 deletions crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::types::SystemContextHandle;

use async_trait::async_trait;
use hotshot_constants::VERSION_0_1;
use hotshot_task_impls::{
consensus::{CommitmentAndMetadata, ConsensusTaskState},
Expand All @@ -24,24 +25,26 @@ use std::{
};

/// Trait for creating task states.
#[async_trait]
pub trait CreateTaskState<TYPES, I>
where
TYPES: NodeType,
I: NodeImplementation<TYPES>,
{
/// Function to create the task state from a given `SystemContextHandle`.
fn create_from(handle: &SystemContextHandle<TYPES, I>) -> Self;
async fn create_from(handle: &SystemContextHandle<TYPES, I>) -> Self;
}

#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
for UpgradeTaskState<TYPES, I, SystemContextHandle<TYPES, I>>
{
fn create_from(
async fn create_from(
handle: &SystemContextHandle<TYPES, I>,
) -> UpgradeTaskState<TYPES, I, SystemContextHandle<TYPES, I>> {
UpgradeTaskState {
api: handle.clone(),
cur_view: TYPES::Time::new(0),
cur_view: handle.get_cur_view().await,
quorum_membership: handle.hotshot.memberships.quorum_membership.clone().into(),
quorum_network: handle.hotshot.networks.quorum_network.clone(),
should_vote: |_upgrade_proposal| false,
Expand All @@ -53,16 +56,17 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
}
}

#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
for VIDTaskState<TYPES, I, SystemContextHandle<TYPES, I>>
{
fn create_from(
async fn create_from(
handle: &SystemContextHandle<TYPES, I>,
) -> VIDTaskState<TYPES, I, SystemContextHandle<TYPES, I>> {
VIDTaskState {
api: handle.clone(),
consensus: handle.hotshot.get_consensus(),
cur_view: TYPES::Time::new(0),
cur_view: handle.get_cur_view().await,
vote_collector: None,
network: handle.hotshot.networks.quorum_network.clone(),
membership: handle.hotshot.memberships.vid_membership.clone().into(),
Expand All @@ -73,10 +77,11 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
}
}

#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
for DATaskState<TYPES, I, SystemContextHandle<TYPES, I>>
{
fn create_from(
async fn create_from(
handle: &SystemContextHandle<TYPES, I>,
) -> DATaskState<TYPES, I, SystemContextHandle<TYPES, I>> {
DATaskState {
Expand All @@ -85,7 +90,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
da_membership: handle.hotshot.memberships.da_membership.clone().into(),
da_network: handle.hotshot.networks.da_network.clone(),
quorum_membership: handle.hotshot.memberships.quorum_membership.clone().into(),
cur_view: TYPES::Time::new(0),
cur_view: handle.get_cur_view().await,
vote_collector: None.into(),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
Expand All @@ -94,15 +99,17 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
}
}

#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
for ViewSyncTaskState<TYPES, I, SystemContextHandle<TYPES, I>>
{
fn create_from(
async fn create_from(
handle: &SystemContextHandle<TYPES, I>,
) -> ViewSyncTaskState<TYPES, I, SystemContextHandle<TYPES, I>> {
let cur_view = handle.get_cur_view().await;
ViewSyncTaskState {
current_view: TYPES::Time::new(0),
next_view: TYPES::Time::new(0),
current_view: cur_view,
next_view: cur_view,
network: handle.hotshot.networks.quorum_network.clone(),
membership: handle
.hotshot
Expand All @@ -125,18 +132,19 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
}
}

#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
for TransactionTaskState<TYPES, I, SystemContextHandle<TYPES, I>>
{
fn create_from(
async fn create_from(
handle: &SystemContextHandle<TYPES, I>,
) -> TransactionTaskState<TYPES, I, SystemContextHandle<TYPES, I>> {
TransactionTaskState {
api: handle.clone(),
consensus: handle.hotshot.get_consensus(),
transactions: Arc::default(),
seen_transactions: HashSet::new(),
cur_view: TYPES::Time::new(0),
cur_view: handle.get_cur_view().await,
network: handle.hotshot.networks.quorum_network.clone(),
membership: handle.hotshot.memberships.quorum_membership.clone().into(),
public_key: handle.public_key().clone(),
Expand All @@ -146,10 +154,11 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
}
}

#[async_trait]
impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
for ConsensusTaskState<TYPES, I, SystemContextHandle<TYPES, I>>
{
fn create_from(
async fn create_from(
handle: &SystemContextHandle<TYPES, I>,
) -> ConsensusTaskState<TYPES, I, SystemContextHandle<TYPES, I>> {
let consensus = handle.hotshot.get_consensus();
Expand All @@ -163,7 +172,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>> CreateTaskState<TYPES, I>
ConsensusTaskState {
consensus,
timeout: handle.hotshot.config.next_view_timeout,
cur_view: TYPES::Time::new(0),
cur_view: handle.get_cur_view().await,
payload_commitment_and_metadata: Some(CommitmentAndMetadata {
commitment: payload_commitment,
metadata,
Expand Down
5 changes: 2 additions & 3 deletions crates/hotshot/src/types/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static> SystemContextHandl
self.hotshot.public_key.clone()
}

/// Wrapper to get this node's current view
#[cfg(feature = "hotshot-testing")]
pub async fn get_current_view(&self) -> TYPES::Time {
/// Wrapper to get the view number this node is on.
pub async fn get_cur_view(&self) -> TYPES::Time {
self.hotshot.consensus.read().await.cur_view
}
}
19 changes: 14 additions & 5 deletions crates/task-impls/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,15 +395,24 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
.await;
}
}));
let consensus = self.consensus.read().await;
let consensus = self.consensus.upgradable_read().await;
consensus
.metrics
.current_view
.set(usize::try_from(self.cur_view.get_u64()).unwrap());
consensus.metrics.number_of_views_since_last_decide.set(
usize::try_from(self.cur_view.get_u64()).unwrap()
- usize::try_from(consensus.last_decided_view.get_u64()).unwrap(),
);
// Do the comparison before the substraction to avoid potential overflow, since
// `last_decided_view` may be greater than `cur_view` if the node is catching up.
if usize::try_from(self.cur_view.get_u64()).unwrap()
> usize::try_from(consensus.last_decided_view.get_u64()).unwrap()
{
consensus.metrics.number_of_views_since_last_decide.set(
usize::try_from(self.cur_view.get_u64()).unwrap()
- usize::try_from(consensus.last_decided_view.get_u64()).unwrap(),
);
}
let mut consensus = RwLockUpgradableReadGuard::upgrade(consensus).await;
consensus.update_view(new_view);
drop(consensus);

return true;
}
Expand Down
1 change: 1 addition & 0 deletions crates/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
warn!("View changed by more than 1 going to view {:?}", view);
}
self.cur_view = view;
self.consensus.write().await.update_view(view);

// Inject view info into network
let is_da = self
Expand Down
1 change: 1 addition & 0 deletions crates/task-impls/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
make_block = self.membership.get_leader(view) == self.public_key;
}
self.cur_view = view;
self.consensus.write().await.update_view(view);

// return if we aren't the next leader or we skipped last view and aren't the current leader.
if !make_block && self.membership.get_leader(self.cur_view + 1) != self.public_key {
Expand Down
Loading

0 comments on commit d0a2638

Please sign in to comment.