Skip to content

Commit

Permalink
Merge pull request ProvableHQ#3217 from AleoHQ/feat/aborted-transmiss…
Browse files Browse the repository at this point in the history
…ions-map

[Fix] Add aborted transmission IDs to storage service.
  • Loading branch information
howardwu authored Apr 16, 2024
2 parents ebfe993 + d9c3826 commit dde9e17
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 170 deletions.
118 changes: 59 additions & 59 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ default-features = false

[workspace.dependencies.snarkvm]
git = "https://github.com/AleoHQ/snarkVM.git"
rev = "d48f6fb"
rev = "da3d78a"
#version = "=0.16.18"
features = [ "circuit", "console", "rocks" ]

Expand Down
161 changes: 84 additions & 77 deletions node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,10 @@ impl<N: Network> BFT<N> {

impl<N: Network> BFT<N> {
/// Stores the certificate in the DAG, and attempts to commit one or more anchors.
async fn update_dag<const ALLOW_LEDGER_ACCESS: bool>(&self, certificate: BatchCertificate<N>) -> Result<()> {
async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
&self,
certificate: BatchCertificate<N>,
) -> Result<()> {
// Acquire the BFT lock.
let _lock = self.lock.lock().await;

Expand Down Expand Up @@ -519,11 +522,11 @@ impl<N: Network> BFT<N> {
info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));

// Commit the leader certificate, and all previous leader certificates since the last committed round.
self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS>(leader_certificate).await
self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
}

/// Commits the leader certificate, and all previous leader certificates since the last committed round.
async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool>(
async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
&self,
leader_certificate: BatchCertificate<N>,
) -> Result<()> {
Expand Down Expand Up @@ -588,72 +591,76 @@ impl<N: Network> BFT<N> {
Ok(subdag) => subdag,
Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
};
// Initialize a map for the deduped transmissions.
let mut transmissions = IndexMap::new();
// Start from the oldest leader certificate.
for certificate in commit_subdag.values().flatten() {
// Retrieve the transmissions.
for transmission_id in certificate.transmission_ids() {
// If the transmission already exists in the map, skip it.
if transmissions.contains_key(transmission_id) {
continue;
}
// If the transmission already exists in the ledger, skip it.
// Note: On failure to read from the ledger, we skip including this transmission, out of safety.
if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
continue;
// If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
if !IS_SYNCING {
// Initialize a map for the deduped transmissions.
let mut transmissions = IndexMap::new();
// Start from the oldest leader certificate.
for certificate in commit_subdag.values().flatten() {
// Retrieve the transmissions.
for transmission_id in certificate.transmission_ids() {
// If the transmission already exists in the map, skip it.
if transmissions.contains_key(transmission_id) {
continue;
}
// If the transmission already exists in the ledger, skip it.
// Note: On failure to read from the ledger, we skip including this transmission, out of safety.
if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
continue;
}
// Retrieve the transmission.
let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
bail!(
"BFT failed to retrieve transmission '{}' from round {}",
fmt_id(transmission_id),
certificate.round()
);
};
// Add the transmission to the set.
transmissions.insert(*transmission_id, transmission);
}
// Retrieve the transmission.
let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
bail!(
"BFT failed to retrieve transmission '{}' from round {}",
fmt_id(transmission_id),
certificate.round()
);
};
// Add the transmission to the set.
transmissions.insert(*transmission_id, transmission);
}
}
// Trigger consensus, as this will build a new block for the ledger.
// Construct the subdag.
let subdag = Subdag::from(commit_subdag.clone())?;
// Retrieve the anchor round.
let anchor_round = subdag.anchor_round();
// Retrieve the number of transmissions.
let num_transmissions = transmissions.len();
// Retrieve metadata about the subdag.
let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();

// Ensure the subdag anchor round matches the leader round.
ensure!(
anchor_round == leader_round,
"BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
);

// Trigger consensus.
if let Some(consensus_sender) = self.consensus_sender.get() {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the subdag and transmissions to consensus.
consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
// Await the callback to continue.
match callback_receiver.await {
Ok(Ok(())) => (), // continue
Ok(Err(e)) => {
error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
return Ok(());
}
Err(e) => {
error!("BFT failed to receive the callback for round {anchor_round} - {e}");
return Ok(());
// Trigger consensus, as this will build a new block for the ledger.
// Construct the subdag.
let subdag = Subdag::from(commit_subdag.clone())?;
// Retrieve the anchor round.
let anchor_round = subdag.anchor_round();
// Retrieve the number of transmissions.
let num_transmissions = transmissions.len();
// Retrieve metadata about the subdag.
let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();

// Ensure the subdag anchor round matches the leader round.
ensure!(
anchor_round == leader_round,
"BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
);

// Trigger consensus.
if let Some(consensus_sender) = self.consensus_sender.get() {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the subdag and transmissions to consensus.
consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
// Await the callback to continue.
match callback_receiver.await {
Ok(Ok(())) => (), // continue
Ok(Err(e)) => {
error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
return Ok(());
}
Err(e) => {
error!("BFT failed to receive the callback for round {anchor_round} - {e}");
return Ok(());
}
}
}

info!(
"\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
);
}

info!(
"\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
);
// Update the DAG, as the subdag was successfully included into a block.
let mut dag_write = self.dag.write();
for certificate in commit_subdag.values().flatten() {
Expand Down Expand Up @@ -784,7 +791,7 @@ impl<N: Network> BFT<N> {
self.spawn(async move {
while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
// Update the DAG with the certificate.
let result = self_.update_dag::<true>(certificate).await;
let result = self_.update_dag::<true, false>(certificate).await;
// Send the callback **after** updating the DAG.
// Note: We must await the DAG update before proceeding.
callback.send(result).ok();
Expand All @@ -804,7 +811,7 @@ impl<N: Network> BFT<N> {
self.spawn(async move {
while let Some((certificate, callback)) = rx_sync_bft.recv().await {
// Update the DAG with the certificate.
let result = self_.update_dag::<true>(certificate).await;
let result = self_.update_dag::<true, true>(certificate).await;
// Send the callback **after** updating the DAG.
// Note: We must await the DAG update before proceeding.
callback.send(result).ok();
Expand Down Expand Up @@ -1175,7 +1182,7 @@ mod tests {

// Insert the previous certificates into the BFT.
for certificate in previous_certificates.clone() {
assert!(bft.update_dag::<false>(certificate).await.is_ok());
assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
}

// Ensure this call succeeds and returns all given certificates.
Expand Down Expand Up @@ -1205,7 +1212,7 @@ mod tests {

// Insert the previous certificates into the BFT.
for certificate in previous_certificates.clone() {
assert!(bft.update_dag::<false>(certificate).await.is_ok());
assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
}

// Ensure this call succeeds and returns all given certificates.
Expand Down Expand Up @@ -1326,11 +1333,11 @@ mod tests {

// Insert the certificates into the BFT.
for certificate in certificates {
assert!(bft.update_dag::<false>(certificate).await.is_ok());
assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
}

// Commit the leader certificate.
bft.commit_leader_certificate::<false>(leader_certificate).await.unwrap();
bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();

// Ensure that the `gc_round` has been updated.
assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
Expand Down Expand Up @@ -1390,11 +1397,11 @@ mod tests {

// Insert the previous certificates into the BFT.
for certificate in certificates.clone() {
assert!(bft.update_dag::<false>(certificate).await.is_ok());
assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
}

// Commit the leader certificate.
bft.commit_leader_certificate::<false>(leader_certificate.clone()).await.unwrap();
bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();

// Simulate a bootup of the BFT.

Expand Down Expand Up @@ -1562,17 +1569,17 @@ mod tests {

// Insert the certificates into the BFT without bootup.
for certificate in pre_shutdown_certificates.clone() {
assert!(bft.update_dag::<false>(certificate).await.is_ok());
assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
}

// Insert the post shutdown certificates into the BFT without bootup.
for certificate in post_shutdown_certificates.clone() {
assert!(bft.update_dag::<false>(certificate).await.is_ok());
assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
}
// Commit the second leader certificate.
let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
bft.commit_leader_certificate::<false>(next_leader_certificate.clone()).await.unwrap();
bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();

// Simulate a bootup of the BFT.

Expand All @@ -1590,14 +1597,14 @@ mod tests {
bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
}
for certificate in post_shutdown_certificates.clone() {
assert!(bootup_bft.update_dag::<false>(certificate).await.is_ok());
assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
}
// Commit the second leader certificate.
let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
let commit_subdag_metadata_bootup =
commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
bootup_bft.commit_leader_certificate::<false>(next_leader_certificate.clone()).await.unwrap();
bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();

// Check that the final state of both BFTs is the same.

Expand Down Expand Up @@ -1777,7 +1784,7 @@ mod tests {

// Insert the post shutdown certificates into the DAG.
for certificate in post_shutdown_certificates.clone() {
assert!(bootup_bft.update_dag::<false>(certificate).await.is_ok());
assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
}

// Get the next leader certificate to commit.
Expand Down
Loading

0 comments on commit dde9e17

Please sign in to comment.