-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
commit_reliable_broadcast.rs
103 lines (90 loc) · 2.83 KB
/
commit_reliable_broadcast.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0
use crate::{network::NetworkSender, network_interface::ConsensusMsg};
use anyhow::bail;
use aptos_consensus_types::{
common::Author,
experimental::{commit_decision::CommitDecision, commit_vote::CommitVote},
};
use aptos_reliable_broadcast::{BroadcastStatus, RBMessage, RBNetworkSender};
use aptos_types::validator_verifier::ValidatorVerifier;
use async_trait::async_trait;
use futures::future::AbortHandle;
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, time::Duration};
#[derive(Clone, Debug, Serialize, Deserialize)]
/// Network message for the pipeline phase
pub enum CommitMessage {
/// Vote on execution result
Vote(CommitVote),
/// Quorum proof on execution result
Decision(CommitDecision),
/// Ack on either vote or decision
Ack(()),
}
impl CommitMessage {
/// Verify the signatures on the message
pub fn verify(&self, verifier: &ValidatorVerifier) -> anyhow::Result<()> {
match self {
CommitMessage::Vote(vote) => vote.verify(verifier),
CommitMessage::Decision(decision) => decision.verify(verifier),
CommitMessage::Ack(_) => bail!("Unexpected ack in incoming commit message"),
}
}
}
impl RBMessage for CommitMessage {}
pub struct AckState {
validators: HashSet<Author>,
}
impl AckState {
pub fn new(validators: impl Iterator<Item = Author>) -> Self {
Self {
validators: validators.collect(),
}
}
}
impl BroadcastStatus<CommitMessage> for AckState {
type Ack = CommitMessage;
type Aggregated = ();
type Message = CommitMessage;
fn add(&mut self, peer: Author, _ack: Self::Ack) -> anyhow::Result<Option<Self::Aggregated>> {
if self.validators.remove(&peer) {
if self.validators.is_empty() {
Ok(Some(()))
} else {
Ok(None)
}
} else {
bail!("Unknown author: {}", peer);
}
}
}
#[async_trait]
impl RBNetworkSender<CommitMessage> for NetworkSender {
async fn send_rb_rpc(
&self,
receiver: Author,
message: CommitMessage,
timeout_duration: Duration,
) -> anyhow::Result<CommitMessage> {
let msg = ConsensusMsg::CommitMessage(Box::new(message));
let response = match self.send_rpc(receiver, msg, timeout_duration).await? {
ConsensusMsg::CommitMessage(resp) if matches!(*resp, CommitMessage::Ack(_)) => *resp,
_ => bail!("Invalid response to request"),
};
Ok(response)
}
}
pub struct DropGuard {
abort_handle: AbortHandle,
}
impl DropGuard {
pub fn new(abort_handle: AbortHandle) -> Self {
Self { abort_handle }
}
}
impl Drop for DropGuard {
fn drop(&mut self) {
self.abort_handle.abort();
}
}