Skip to content

Commit

Permalink
Canonicalize some variable names, minor cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
pls148 committed Oct 25, 2024
1 parent 01ea0ab commit 727695b
Show file tree
Hide file tree
Showing 13 changed files with 211 additions and 181 deletions.
6 changes: 3 additions & 3 deletions crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
async fn create_from(handle: &SystemContextHandle<TYPES, I, V>) -> Self {
Self {
network: Arc::clone(&handle.hotshot.network),
state: OuterConsensus::new(handle.hotshot.consensus()),
consensus: OuterConsensus::new(handle.hotshot.consensus()),
view: handle.cur_view().await,
delay: handle.hotshot.config.data_request_delay,
da_membership: handle.hotshot.memberships.da_membership.clone(),
Expand Down Expand Up @@ -162,9 +162,9 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
let cur_view = handle.cur_view().await;

Self {
current_view: cur_view,
cur_view,
next_view: cur_view,
current_epoch: handle.cur_epoch().await,
cur_epoch: handle.cur_epoch().await,
network: Arc::clone(&handle.hotshot.network),
membership: handle.hotshot.memberships.quorum_membership.clone().into(),
public_key: handle.public_key().clone(),
Expand Down
17 changes: 10 additions & 7 deletions crates/task-impls/src/consensus/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ pub(crate) async fn handle_view_change<
))
.await;

let consensus = task_state.consensus.read().await;
consensus
let consensus_reader = task_state.consensus.read().await;
consensus_reader
.metrics
.current_view
.set(usize::try_from(task_state.cur_view.u64()).unwrap());
Expand All @@ -181,7 +181,7 @@ pub(crate) async fn handle_view_change<
== task_state.public_key
{
#[allow(clippy::cast_precision_loss)]
consensus
consensus_reader
.metrics
.view_duration_as_leader
.add_point((cur_view_time - task_state.cur_view_time) as f64);
Expand All @@ -193,10 +193,13 @@ pub(crate) async fn handle_view_change<
if usize::try_from(task_state.cur_view.u64()).unwrap()
> usize::try_from(task_state.last_decided_view.u64()).unwrap()
{
consensus.metrics.number_of_views_since_last_decide.set(
usize::try_from(task_state.cur_view.u64()).unwrap()
- usize::try_from(task_state.last_decided_view.u64()).unwrap(),
);
consensus_reader
.metrics
.number_of_views_since_last_decide
.set(
usize::try_from(task_state.cur_view.u64()).unwrap()
- usize::try_from(task_state.last_decided_view.u64()).unwrap(),
);
}

broadcast_event(
Expand Down
14 changes: 8 additions & 6 deletions crates/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYP
.await;
}
HotShotEvent::DaProposalValidated(proposal, sender) => {
let curr_view = self.consensus.read().await.cur_view();
let cur_view = self.consensus.read().await.cur_view();
ensure!(
curr_view <= proposal.data.view_number() + 1,
cur_view <= proposal.data.view_number() + 1,
debug!(
"Validated DA proposal for prior view but it's too old now Current view {:?}, DA Proposal view {:?}",
curr_view,
cur_view,
proposal.data.view_number()
)
);
Expand Down Expand Up @@ -212,18 +212,20 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYP
tracing::debug!("Sending vote to the DA leader {:?}", vote.view_number());

broadcast_event(Arc::new(HotShotEvent::DaVoteSend(vote)), &event_stream).await;
let mut consensus = self.consensus.write().await;
let mut consensus_writer = self.consensus.write().await;

// Ensure this view is in the view map for garbage collection.
let view = View {
view_inner: ViewInner::Da { payload_commitment },
};
if let Err(e) = consensus.update_validated_state_map(view_number, view.clone()) {
if let Err(e) =
consensus_writer.update_validated_state_map(view_number, view.clone())
{
tracing::trace!("{e:?}");
}

// Record the payload we have promised to make available.
if let Err(e) = consensus.update_saved_payloads(
if let Err(e) = consensus_writer.update_saved_payloads(
view_number,
Arc::clone(&proposal.data.encoded_transactions),
) {
Expand Down
94 changes: 51 additions & 43 deletions crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
.await;

let mem = Arc::clone(&quorum_membership);
let current_epoch = consensus.read().await.cur_epoch();
let cur_epoch = consensus.read().await.cur_epoch();
// Make a background task to await the arrival of the event data.
let Ok(Some(proposal)) =
// We want to explicitly timeout here so we aren't waiting around for the data.
Expand Down Expand Up @@ -114,7 +114,7 @@ pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
hs_event.as_ref()
{
// Make sure that the quorum_proposal is valid
if quorum_proposal.validate_signature(&mem, current_epoch, upgrade_lock).await.is_ok() {
if quorum_proposal.validate_signature(&mem, cur_epoch, upgrade_lock).await.is_ok() {
proposal = Some(quorum_proposal.clone());
}

Expand All @@ -133,12 +133,12 @@ pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
let justify_qc = proposal.data.justify_qc.clone();

if !justify_qc
.is_valid_cert(quorum_membership.as_ref(), current_epoch, upgrade_lock)
.is_valid_cert(quorum_membership.as_ref(), cur_epoch, upgrade_lock)
.await
{
bail!("Invalid justify_qc in proposal for view {}", *view_number);
}
let mut consensus_write = consensus.write().await;
let mut consensus_writer = consensus.write().await;
let leaf = Leaf::from_quorum_proposal(&proposal.data);
let state = Arc::new(
<TYPES::ValidatedState as ValidatedState<TYPES>>::from_header(&proposal.data.block_header),
Expand All @@ -151,13 +151,14 @@ pub(crate) async fn fetch_proposal<TYPES: NodeType, V: Versions>(
delta: None,
},
};
if let Err(e) = consensus_write.update_validated_state_map(view_number, view.clone()) {
if let Err(e) = consensus_writer.update_validated_state_map(view_number, view.clone()) {
tracing::trace!("{e:?}");
}

consensus_write
consensus_writer
.update_saved_leaves(leaf.clone(), upgrade_lock)
.await;

broadcast_event(
HotShotEvent::ValidatedStateUpdated(view_number, view).into(),
&event_sender,
Expand Down Expand Up @@ -369,21 +370,24 @@ pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
consensus: OuterConsensus<TYPES>,
upgrade_lock: &UpgradeLock<TYPES, V>,
) -> Result<(Leaf<TYPES>, Arc<<TYPES as NodeType>::ValidatedState>)> {
let current_epoch = consensus.read().await.cur_epoch();
let consensus_reader = consensus.read().await;
let cur_epoch = consensus_reader.cur_epoch();
ensure!(
quorum_membership.leader(next_proposal_view_number, current_epoch)? == public_key,
quorum_membership.leader(next_proposal_view_number, cur_epoch)? == public_key,
info!(
"Somehow we formed a QC but are not the leader for the next view {:?}",
next_proposal_view_number
)
);
let parent_view_number = consensus.read().await.high_qc().view_number();
if !consensus
let parent_view_number = consensus_reader.high_qc().view_number();
let vsm_contains_parent_view = consensus
.read()
.await
.validated_state_map()
.contains_key(&parent_view_number)
{
.contains_key(&parent_view_number);
drop(consensus_reader);

if !vsm_contains_parent_view {
let _ = fetch_proposal(
parent_view_number,
event_sender.clone(),
Expand All @@ -397,6 +401,7 @@ pub(crate) async fn parent_leaf_and_state<TYPES: NodeType, V: Versions>(
.await
.context(info!("Failed to fetch proposal"))?;
}

let consensus_reader = consensus.read().await;
let parent_view_number = consensus_reader.high_qc().view_number();
let parent_view = consensus_reader.validated_state_map().get(&parent_view_number).context(
Expand Down Expand Up @@ -463,17 +468,17 @@ pub async fn validate_proposal_safety_and_liveness<
};

{
let mut consensus_write = task_state.consensus.write().await;
if let Err(e) = consensus_write.update_validated_state_map(view_number, view.clone()) {
let mut consensus_writer = task_state.consensus.write().await;
if let Err(e) = consensus_writer.update_validated_state_map(view_number, view.clone()) {
tracing::trace!("{e:?}");
}
consensus_write
consensus_writer
.update_saved_leaves(proposed_leaf.clone(), &task_state.upgrade_lock)
.await;

// Update our internal storage of the proposal. The proposal is valid, so
// we swallow this error and just log if it occurs.
if let Err(e) = consensus_write.update_proposed_view(proposal.clone()) {
if let Err(e) = consensus_writer.update_proposed_view(proposal.clone()) {
tracing::debug!("Internal proposal update failed; error = {e:#}");
};
}
Expand All @@ -485,11 +490,11 @@ pub async fn validate_proposal_safety_and_liveness<
)
.await;

let current_epoch = task_state.cur_epoch;
let cur_epoch = task_state.cur_epoch;
UpgradeCertificate::validate(
&proposal.data.upgrade_certificate,
&task_state.quorum_membership,
current_epoch,
cur_epoch,
&task_state.upgrade_lock,
)
.await?;
Expand All @@ -508,19 +513,19 @@ pub async fn validate_proposal_safety_and_liveness<

// Liveness check.
{
let read_consensus = task_state.consensus.read().await;
let liveness_check = justify_qc.view_number() > read_consensus.locked_view();
let consensus_reader = task_state.consensus.read().await;
let liveness_check = justify_qc.view_number() > consensus_reader.locked_view();

// Safety check.
// Check if proposal extends from the locked leaf.
let outcome = read_consensus.visit_leaf_ancestors(
let outcome = consensus_reader.visit_leaf_ancestors(
justify_qc.view_number(),
Terminator::Inclusive(read_consensus.locked_view()),
Terminator::Inclusive(consensus_reader.locked_view()),
false,
|leaf, _, _| {
// if leaf view no == locked view no then we're done, report success by
// returning true
leaf.view_number() != read_consensus.locked_view()
leaf.view_number() != consensus_reader.locked_view()
},
);
let safety_check = outcome.is_ok();
Expand All @@ -537,7 +542,7 @@ pub async fn validate_proposal_safety_and_liveness<
.await;
}

error!("Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked view is {:?}", read_consensus.high_qc(), proposal.data.clone(), read_consensus.locked_view())
error!("Failed safety and liveness check \n High QC is {:?} Proposal QC is {:?} Locked view is {:?}", consensus_reader.high_qc(), proposal.data.clone(), consensus_reader.locked_view())
});
}

Expand Down Expand Up @@ -592,9 +597,9 @@ pub async fn validate_proposal_view_and_certs<
proposal: &Proposal<TYPES, QuorumProposal<TYPES>>,
task_state: &mut QuorumProposalRecvTaskState<TYPES, I, V>,
) -> Result<()> {
let view = proposal.data.view_number();
let view_number = proposal.data.view_number();
ensure!(
view >= task_state.cur_view,
view_number >= task_state.cur_view,
"Proposal is from an older view {:?}",
proposal.data.clone()
);
Expand All @@ -609,19 +614,19 @@ pub async fn validate_proposal_view_and_certs<
.await?;

// Verify a timeout certificate OR a view sync certificate exists and is valid.
if proposal.data.justify_qc.view_number() != view - 1 {
if proposal.data.justify_qc.view_number() != view_number - 1 {
let received_proposal_cert =
proposal.data.proposal_certificate.clone().context(debug!(
"Quorum proposal for view {} needed a timeout or view sync certificate, but did not have one",
*view
*view_number
))?;

match received_proposal_cert {
ViewChangeEvidence::Timeout(timeout_cert) => {
ensure!(
timeout_cert.data().view == view - 1,
timeout_cert.data().view == view_number - 1,
"Timeout certificate for view {} was not for the immediately preceding view",
*view
*view_number
);
ensure!(
timeout_cert
Expand All @@ -632,15 +637,15 @@ pub async fn validate_proposal_view_and_certs<
)
.await,
"Timeout certificate for view {} was invalid",
*view
*view_number
);
}
ViewChangeEvidence::ViewSync(view_sync_cert) => {
ensure!(
view_sync_cert.view_number == view,
view_sync_cert.view_number == view_number,
"View sync cert view number {:?} does not match proposal view number {:?}",
view_sync_cert.view_number,
view
view_number
);

// View sync certs must also be valid.
Expand Down Expand Up @@ -740,15 +745,15 @@ pub(crate) async fn update_view<TYPES: NodeType, I: NodeImplementation<TYPES>, V
))
.await;

let consensus = task_state.consensus.upgradable_read().await;
consensus
let consensus_reader = task_state.consensus.upgradable_read().await;
consensus_reader
.metrics
.current_view
.set(usize::try_from(task_state.cur_view.u64()).unwrap());
let new_view_time = Utc::now().timestamp();
if is_old_view_leader {
#[allow(clippy::cast_precision_loss)]
consensus
consensus_reader
.metrics
.view_duration_as_leader
.add_point((new_view_time - task_state.cur_view_time) as f64);
Expand All @@ -758,15 +763,18 @@ pub(crate) async fn update_view<TYPES: NodeType, I: NodeImplementation<TYPES>, V
// Do the comparison before the subtraction to avoid potential overflow, since
// `last_decided_view` may be greater than `cur_view` if the node is catching up.
if usize::try_from(task_state.cur_view.u64()).unwrap()
> usize::try_from(consensus.last_decided_view().u64()).unwrap()
> usize::try_from(consensus_reader.last_decided_view().u64()).unwrap()
{
consensus.metrics.number_of_views_since_last_decide.set(
usize::try_from(task_state.cur_view.u64()).unwrap()
- usize::try_from(consensus.last_decided_view().u64()).unwrap(),
);
consensus_reader
.metrics
.number_of_views_since_last_decide
.set(
usize::try_from(task_state.cur_view.u64()).unwrap()
- usize::try_from(consensus_reader.last_decided_view().u64()).unwrap(),
);
}
let mut consensus = ConsensusUpgradableReadLockGuard::upgrade(consensus).await;
if let Err(e) = consensus.update_view(new_view) {
let mut consensus_writer = ConsensusUpgradableReadLockGuard::upgrade(consensus_reader).await;
if let Err(e) = consensus_writer.update_view(new_view) {
tracing::trace!("{e:?}");
}
tracing::trace!("View updated successfully");
Expand Down
Loading

0 comments on commit 727695b

Please sign in to comment.