Skip to content

Commit

Permalink
[CATCHUP] Init Consensus with 2 views to prevent Double Vote/Propose (#…
Browse files Browse the repository at this point in the history
…3648)

* Add second view param to init, spin nodes in tests up from view 0

* add last actioned views to consensus

* check view before doing action

* fmt and lint

* log error when trying to double propose/vote

* fix for DA votes

* still update the da vote view

* restart with actioned view from storage

* lint

* Allow access to consensus from byz tests, reset actions in byz behavior

* lint
  • Loading branch information
bfish713 authored Sep 5, 2024
1 parent 3403dc2 commit fb9f0e0
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 22 deletions.
19 changes: 16 additions & 3 deletions crates/example-types/src/storage_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ use async_trait::async_trait;
use hotshot_types::{
consensus::CommitmentMap,
data::{DaProposal, Leaf, QuorumProposal, VidDisperseShare},
event::HotShotAction,
message::Proposal,
simple_certificate::QuorumCertificate,
traits::{node_implementation::NodeType, storage::Storage},
traits::{
node_implementation::{ConsensusTime, NodeType},
storage::Storage,
},
utils::View,
vote::HasViewNumber,
};
Expand All @@ -35,6 +39,7 @@ pub struct TestStorageState<TYPES: NodeType> {
das: HashMap<TYPES::Time, Proposal<TYPES, DaProposal<TYPES>>>,
proposals: BTreeMap<TYPES::Time, Proposal<TYPES, QuorumProposal<TYPES>>>,
high_qc: Option<hotshot_types::simple_certificate::QuorumCertificate<TYPES>>,
action: TYPES::Time,
}

impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
Expand All @@ -44,6 +49,7 @@ impl<TYPES: NodeType> Default for TestStorageState<TYPES> {
das: HashMap::new(),
proposals: BTreeMap::new(),
high_qc: None,
action: TYPES::Time::genesis(),
}
}
}
Expand Down Expand Up @@ -85,6 +91,9 @@ impl<TYPES: NodeType> TestStorage<TYPES> {
pub async fn high_qc_cloned(&self) -> Option<QuorumCertificate<TYPES>> {
self.inner.read().await.high_qc.clone()
}
pub async fn last_actioned_view(&self) -> TYPES::Time {
self.inner.read().await.action
}
}

#[async_trait]
Expand Down Expand Up @@ -131,12 +140,16 @@ impl<TYPES: NodeType> Storage<TYPES> for TestStorage<TYPES> {

async fn record_action(
&self,
_view: <TYPES as NodeType>::Time,
_action: hotshot_types::event::HotShotAction,
view: <TYPES as NodeType>::Time,
action: hotshot_types::event::HotShotAction,
) -> Result<()> {
if self.should_return_err {
bail!("Failed to append Action to storage");
}
let mut inner = self.inner.write().await;
if view > inner.action && matches!(action, HotShotAction::Vote | HotShotAction::Propose) {
inner.action = view;
}
Self::run_delay_settings_from_config(&self.delay_config).await;
Ok(())
}
Expand Down
11 changes: 8 additions & 3 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
validated_state_map,
anchored_leaf.view_number(),
anchored_leaf.view_number(),
// TODO this is incorrect
// https://github.com/EspressoSystems/HotShot/issues/560
anchored_leaf.view_number(),
initializer.actioned_view,
initializer.saved_proposals,
saved_leaves,
saved_payloads,
Expand Down Expand Up @@ -951,8 +950,11 @@ pub struct HotShotInitializer<TYPES: NodeType> {
/// If it's given, we'll use it to construct the `SystemContext`.
state_delta: Option<Arc<<TYPES::ValidatedState as ValidatedState<TYPES>>::Delta>>,

/// Starting view number that we are confident won't lead to a double vote after restart.
/// Starting view number that should be equivelant to the view the node shut down with last.
start_view: TYPES::Time,
/// The view we last performed an action in. An action is Proposing or voting for
/// Either the quorum or DA.
actioned_view: TYPES::Time,
/// Highest QC that was seen, for genesis it's the genesis QC. It should be for a view greater
/// than `inner`s view number for the non genesis case because we must have seen higher QCs
/// to decide on the leaf.
Expand Down Expand Up @@ -981,6 +983,7 @@ impl<TYPES: NodeType> HotShotInitializer<TYPES> {
validated_state: Some(Arc::new(validated_state)),
state_delta: Some(Arc::new(state_delta)),
start_view: TYPES::Time::new(0),
actioned_view: TYPES::Time::new(0),
saved_proposals: BTreeMap::new(),
high_qc,
undecided_leafs: Vec::new(),
Expand All @@ -1002,6 +1005,7 @@ impl<TYPES: NodeType> HotShotInitializer<TYPES> {
instance_state: TYPES::InstanceState,
validated_state: Option<Arc<TYPES::ValidatedState>>,
start_view: TYPES::Time,
actioned_view: TYPES::Time,
saved_proposals: BTreeMap<TYPES::Time, Proposal<TYPES, QuorumProposal<TYPES>>>,
high_qc: QuorumCertificate<TYPES>,
undecided_leafs: Vec<Leaf<TYPES>>,
Expand All @@ -1013,6 +1017,7 @@ impl<TYPES: NodeType> HotShotInitializer<TYPES> {
validated_state,
state_delta: None,
start_view,
actioned_view,
saved_proposals,
high_qc,
undecided_leafs,
Expand Down
6 changes: 5 additions & 1 deletion crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use hotshot_task_impls::{
vid::VidTaskState,
view_sync::ViewSyncTaskState,
};
use hotshot_types::message::UpgradeLock;
use hotshot_types::{consensus::Consensus, message::UpgradeLock};
use hotshot_types::{
constants::EVENT_CHANNEL_SIZE,
message::Messages,
Expand Down Expand Up @@ -191,6 +191,7 @@ pub fn add_network_event_task<
membership,
filter,
storage: Arc::clone(&handle.storage()),
consensus: Arc::clone(&handle.consensus()),
upgrade_lock: handle.hotshot.upgrade_lock.clone(),
};
let task = Task::new(
Expand Down Expand Up @@ -288,6 +289,7 @@ where
public_key: &TYPES::SignatureKey,
private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
upgrade_lock: &UpgradeLock<TYPES, V>,
consensus: Arc<RwLock<Consensus<TYPES>>>,
) -> Vec<HotShotEvent<TYPES>>;

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -384,6 +386,7 @@ where
let public_key = handle.public_key().clone();
let private_key = handle.private_key().clone();
let upgrade_lock = handle.hotshot.upgrade_lock.clone();
let consensus = Arc::clone(&handle.hotshot.consensus());
let send_handle = async_spawn(async move {
futures::pin_mut!(shutdown_signal);

Expand Down Expand Up @@ -415,6 +418,7 @@ where
&public_key,
&private_key,
&upgrade_lock,
Arc::clone(&consensus)
).await;
results.reverse();
while let Some(event) = results.pop() {
Expand Down
2 changes: 1 addition & 1 deletion crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ pub async fn validate_proposal_safety_and_liveness<

// Update our internal storage of the proposal. The proposal is valid, so
// we swallow this error and just log if it occurs.
if let Err(e) = consensus_write.update_last_proposed_view(proposal.clone()) {
if let Err(e) = consensus_write.update_proposed_view(proposal.clone()) {
tracing::debug!("Internal proposal update failed; error = {e:#}");
};

Expand Down
19 changes: 13 additions & 6 deletions crates/task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use async_lock::RwLock;
use async_trait::async_trait;
use hotshot_task::task::TaskState;
use hotshot_types::{
consensus::Consensus,
data::{VidDisperse, VidDisperseShare},
event::{Event, EventType, HotShotAction},
message::{
Expand Down Expand Up @@ -226,6 +227,8 @@ pub struct NetworkEventTaskState<
pub filter: fn(&Arc<HotShotEvent<TYPES>>) -> bool,
/// Storage to store actionable events
pub storage: Arc<RwLock<S>>,
/// Shared consensus state
pub consensus: Arc<RwLock<Consensus<TYPES>>>,
/// Lock for a decided upgrade
pub upgrade_lock: UpgradeLock<TYPES, V>,
}
Expand Down Expand Up @@ -313,10 +316,12 @@ impl<

let net = Arc::clone(&self.channel);
let storage = Arc::clone(&self.storage);
let state = Arc::clone(&self.consensus);
async_spawn(async move {
if NetworkEventTaskState::<TYPES, V, COMMCHANNEL, S>::maybe_record_action(
Some(HotShotAction::VidDisperse),
storage,
state,
view,
)
.await
Expand All @@ -337,15 +342,15 @@ impl<
async fn maybe_record_action(
maybe_action: Option<HotShotAction>,
storage: Arc<RwLock<S>>,
state: Arc<RwLock<Consensus<TYPES>>>,
view: <TYPES as NodeType>::Time,
) -> Result<(), ()> {
if let Some(action) = maybe_action {
match storage
.write()
.await
.record_action(view, action.clone())
.await
{
if !state.write().await.update_action(action, view) {
warn!("Already actioned {:?} in view {:?}", action, view);
return Err(());
}
match storage.write().await.record_action(view, action).await {
Ok(()) => Ok(()),
Err(e) => {
warn!("Not Sending {:?} because of storage error: {:?}", action, e);
Expand Down Expand Up @@ -546,11 +551,13 @@ impl<
let committee_topic = membership.committee_topic();
let net = Arc::clone(&self.channel);
let storage = Arc::clone(&self.storage);
let state = Arc::clone(&self.consensus);
let upgrade_lock = self.upgrade_lock.clone();
async_spawn(async move {
if NetworkEventTaskState::<TYPES, V, COMMCHANNEL, S>::maybe_record_action(
maybe_action,
Arc::clone(&storage),
state,
view,
)
.await
Expand Down
11 changes: 11 additions & 0 deletions crates/testing/src/byzantine/byzantine_behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use anyhow::Context;
use async_lock::RwLock;
use async_trait::async_trait;
use hotshot::tasks::EventTransformerState;
use hotshot::types::{SignatureKey, SystemContextHandle};
use hotshot_task_impls::events::HotShotEvent;
use hotshot_task_impls::network::test::{ModifierClosure, NetworkEventTaskStateModifier};
use hotshot_task_impls::network::NetworkEventTaskState;
use hotshot_types::consensus::Consensus;
use hotshot_types::message::UpgradeLock;
use hotshot_types::simple_vote::QuorumVote;
use hotshot_types::traits::node_implementation::ConsensusTime;
Expand Down Expand Up @@ -39,12 +41,15 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> EventTransforme
_public_key: &TYPES::SignatureKey,
_private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
_upgrade_lock: &UpgradeLock<TYPES, V>,
consensus: Arc<RwLock<Consensus<TYPES>>>,
) -> Vec<HotShotEvent<TYPES>> {
match event {
HotShotEvent::QuorumProposalSend(proposal, signature) => {
let mut result = Vec::new();

for n in 0..self.multiplier {
// reset last actioned view so we actually propose multiple times
consensus.write().await.reset_actions();
let mut modified_proposal = proposal.clone();

modified_proposal.data.view_number += n * self.increment;
Expand Down Expand Up @@ -80,6 +85,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> EventTransforme
_public_key: &TYPES::SignatureKey,
_private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
_upgrade_lock: &UpgradeLock<TYPES, V>,
_consensus: Arc<RwLock<Consensus<TYPES>>>,
) -> Vec<HotShotEvent<TYPES>> {
match event {
HotShotEvent::QuorumProposalSend(_, _) | HotShotEvent::QuorumVoteSend(_) => {
Expand Down Expand Up @@ -155,6 +161,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + std::fmt::Debug, V: Version
_public_key: &TYPES::SignatureKey,
_private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
_upgrade_lock: &UpgradeLock<TYPES, V>,
_consensus: Arc<RwLock<Consensus<TYPES>>>,
) -> Vec<HotShotEvent<TYPES>> {
match event {
HotShotEvent::QuorumProposalSend(proposal, sender) => {
Expand Down Expand Up @@ -195,6 +202,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + std::fmt::Debug, V: Version
_public_key: &TYPES::SignatureKey,
_private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
_upgrade_lock: &UpgradeLock<TYPES, V>,
_consensus: Arc<RwLock<Consensus<TYPES>>>,
) -> Vec<HotShotEvent<TYPES>> {
if let HotShotEvent::DacSend(cert, sender) = event {
self.total_da_certs_sent_from_node += 1;
Expand Down Expand Up @@ -264,6 +272,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + std::fmt::Debug, V: Version
_public_key: &TYPES::SignatureKey,
_private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
_upgrade_lock: &UpgradeLock<TYPES, V>,
_consensus: Arc<RwLock<Consensus<TYPES>>>,
) -> Vec<HotShotEvent<TYPES>> {
vec![event.clone()]
}
Expand Down Expand Up @@ -291,6 +300,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + std::fmt::Debug, V: Version
public_key: &TYPES::SignatureKey,
private_key: &<TYPES::SignatureKey as SignatureKey>::PrivateKey,
upgrade_lock: &UpgradeLock<TYPES, V>,
_consensus: Arc<RwLock<Consensus<TYPES>>>,
) -> Vec<HotShotEvent<TYPES>> {
if let HotShotEvent::QuorumVoteSend(vote) = event {
let new_view = vote.view_number + self.view_increment;
Expand Down Expand Up @@ -323,6 +333,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + std::fmt::Debug, V: Version
membership,
filter,
storage: Arc::clone(&handle.storage()),
consensus: Arc::clone(&handle.consensus()),
upgrade_lock: handle.hotshot.upgrade_lock.clone(),
};
let modified_network_state = NetworkEventTaskStateModifier {
Expand Down
5 changes: 4 additions & 1 deletion crates/testing/src/spinning_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use hotshot_example_types::{
storage_types::TestStorage,
testable_delay::DelayConfig,
};
use hotshot_types::traits::node_implementation::ConsensusTime;
use hotshot_types::{
data::Leaf,
event::Event,
Expand Down Expand Up @@ -134,7 +135,8 @@ where
self.last_decided_leaf.clone(),
TestInstanceState::new(self.async_delay_config.clone()),
None,
view_number,
TYPES::Time::genesis(),
TYPES::Time::genesis(),
BTreeMap::new(),
self.high_qc.clone(),
Vec::new(),
Expand Down Expand Up @@ -212,6 +214,7 @@ where
TestInstanceState::new(self.async_delay_config.clone()),
None,
view_number,
read_storage.last_actioned_view().await,
read_storage.proposals_cloned().await,
read_storage.high_qc_cloned().await.unwrap_or(
QuorumCertificate::genesis(
Expand Down
17 changes: 12 additions & 5 deletions crates/testing/tests/tests_1/network_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use hotshot_task_impls::{
network::{self, NetworkEventTaskState},
};
use hotshot_testing::{
test_builder::TestDescription, test_task::add_network_message_test_task,
view_generator::TestViewGenerator,
helpers::build_system_handle, test_builder::TestDescription,
test_task::add_network_message_test_task, view_generator::TestViewGenerator,
};
use hotshot_types::{
data::ViewNumber,
Expand All @@ -28,7 +28,6 @@ use hotshot_types::{
node_implementation::{ConsensusTime, NodeType},
},
};

// Test that the event task sends a message, and the message task receives it
// and emits the proper event
#[cfg(test)]
Expand All @@ -46,12 +45,15 @@ async fn test_network_task() {
TestDescription::default_multiple_rounds();
let upgrade_lock = UpgradeLock::<TestTypes, TestVersions>::new();
let node_id = 1;

let handle = build_system_handle::<TestTypes, MemoryImpl, TestVersions>(node_id)
.await
.0;
let launcher = builder.gen_launcher(node_id);

let network = (launcher.resource_generator.channel_generator)(node_id).await;

let storage = Arc::new(RwLock::new((launcher.resource_generator.storage)(node_id)));
let consensus = handle.hotshot.consensus();
let config = launcher.resource_generator.config.clone();
let public_key = config.my_own_validator_config.public_key;
let known_nodes_with_stake = config.known_nodes_with_stake.clone();
Expand All @@ -70,6 +72,7 @@ async fn test_network_task() {
filter: network::quorum_filter,
upgrade_lock: upgrade_lock.clone(),
storage,
consensus,
};
let (tx, rx) = async_broadcast::broadcast(10);
let mut task_reg = ConsensusTaskRegistry::new();
Expand Down Expand Up @@ -120,11 +123,14 @@ async fn test_network_storage_fail() {
let builder: TestDescription<TestTypes, MemoryImpl, TestVersions> =
TestDescription::default_multiple_rounds();
let node_id = 1;

let handle = build_system_handle::<TestTypes, MemoryImpl, TestVersions>(node_id)
.await
.0;
let launcher = builder.gen_launcher(node_id);

let network = (launcher.resource_generator.channel_generator)(node_id).await;

let consensus = handle.hotshot.consensus();
let storage = Arc::new(RwLock::new((launcher.resource_generator.storage)(node_id)));
storage.write().await.should_return_err = true;
let config = launcher.resource_generator.config.clone();
Expand All @@ -146,6 +152,7 @@ async fn test_network_storage_fail() {
filter: network::quorum_filter,
upgrade_lock: upgrade_lock.clone(),
storage,
consensus,
};
let (tx, rx) = async_broadcast::broadcast(10);
let mut task_reg = ConsensusTaskRegistry::new();
Expand Down
Loading

0 comments on commit fb9f0e0

Please sign in to comment.