Skip to content

Commit

Permalink
jwk update quorum certification
Browse files Browse the repository at this point in the history
  • Loading branch information
zjma committed Feb 1, 2024
1 parent 99cc957 commit b019753
Show file tree
Hide file tree
Showing 4 changed files with 318 additions and 0 deletions.
65 changes: 65 additions & 0 deletions crates/aptos-jwk-consensus/src/certified_update_producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright © Aptos Foundation

use crate::{
observation_aggregation::ObservationAggregationState,
types::{JWKConsensusMsg, ObservedUpdateRequest},
};
use aptos_channels::aptos_channel;
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_types::{
epoch_state::EpochState,
jwks::{ProviderJWKs, QuorumCertifiedUpdate},
};
use futures_util::future::{AbortHandle, Abortable};
use std::sync::Arc;
use tokio_retry::strategy::ExponentialBackoff;

/// A sub-process of the whole JWK consensus process.
/// Once invoked by `JWKConsensusManager` to `start_produce`,
/// it starts producing a `QuorumCertifiedUpdate` and returns an abort handle.
/// Once an `QuorumCertifiedUpdate` is available, it is sent back via a channel given earlier.
pub trait CertifiedUpdateProducer: Send + Sync {
fn start_produce(
&self,
epoch_state: Arc<EpochState>,
payload: ProviderJWKs,
qc_update_tx: Option<aptos_channel::Sender<(), QuorumCertifiedUpdate>>,
) -> AbortHandle;
}

pub struct RealCertifiedUpdateProducer {
reliable_broadcast: Arc<ReliableBroadcast<JWKConsensusMsg, ExponentialBackoff>>,
}

impl RealCertifiedUpdateProducer {
pub fn new(reliable_broadcast: ReliableBroadcast<JWKConsensusMsg, ExponentialBackoff>) -> Self {
Self {
reliable_broadcast: Arc::new(reliable_broadcast),
}
}

Check warning on line 39 in crates/aptos-jwk-consensus/src/certified_update_producer.rs

View check run for this annotation

Codecov / codecov/patch

crates/aptos-jwk-consensus/src/certified_update_producer.rs#L35-L39

Added lines #L35 - L39 were not covered by tests
}

impl CertifiedUpdateProducer for RealCertifiedUpdateProducer {
fn start_produce(
&self,
epoch_state: Arc<EpochState>,
payload: ProviderJWKs,
qc_update_tx: Option<aptos_channel::Sender<(), QuorumCertifiedUpdate>>,
) -> AbortHandle {
let rb = self.reliable_broadcast.clone();
let req = ObservedUpdateRequest {
epoch: epoch_state.epoch,
issuer: payload.issuer.clone(),
};
let agg_state = Arc::new(ObservationAggregationState::new(epoch_state, payload));
let task = async move {
let qc_update = rb.broadcast(req, agg_state).await;
if let Some(tx) = qc_update_tx {
let _ = tx.push((), qc_update);
}
};
let (abort_handle, abort_registration) = AbortHandle::new_pair();
tokio::spawn(Abortable::new(task, abort_registration));
abort_handle
}

Check warning on line 64 in crates/aptos-jwk-consensus/src/certified_update_producer.rs

View check run for this annotation

Codecov / codecov/patch

crates/aptos-jwk-consensus/src/certified_update_producer.rs#L43-L64

Added lines #L43 - L64 were not covered by tests
}
2 changes: 2 additions & 0 deletions crates/aptos-jwk-consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub fn start_jwk_consensus_runtime(
runtime
}

pub mod certified_update_producer;
pub mod network;
pub mod network_interface;
pub mod observation_aggregation;
pub mod types;
108 changes: 108 additions & 0 deletions crates/aptos-jwk-consensus/src/observation_aggregation/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright © Aptos Foundation

use crate::types::{
JWKConsensusMsg, ObservedUpdate, ObservedUpdateRequest, ObservedUpdateResponse,
};
use anyhow::ensure;
use aptos_consensus_types::common::Author;
use aptos_crypto::bls12381;
use aptos_infallible::Mutex;
use aptos_reliable_broadcast::BroadcastStatus;
use aptos_types::{
epoch_state::EpochState,
jwks::{ProviderJWKs, QuorumCertifiedUpdate},
};
use move_core_types::account_address::AccountAddress;
use std::{collections::HashSet, sync::Arc};

/// The aggregation state of reliable broadcast where a validator broadcast JWK observation requests
/// and produce quorum-certified JWK updates.
pub struct ObservationAggregationState {
epoch_state: Arc<EpochState>,
local_view: ProviderJWKs,
inner_state: Mutex<InnerState>,
}

#[derive(Default)]
struct InnerState {
pub contributors: HashSet<AccountAddress>,
pub multi_sig: Option<bls12381::Signature>,
}

impl ObservationAggregationState {
pub fn new(epoch_state: Arc<EpochState>, local_view: ProviderJWKs) -> Self {
Self {
epoch_state,
local_view,
inner_state: Mutex::new(InnerState::default()),
}
}
}

impl BroadcastStatus<JWKConsensusMsg> for Arc<ObservationAggregationState> {
type Aggregated = QuorumCertifiedUpdate;
type Message = ObservedUpdateRequest;
type Response = ObservedUpdateResponse;

fn add(
&self,
sender: Author,
response: Self::Response,
) -> anyhow::Result<Option<Self::Aggregated>> {
let ObservedUpdateResponse { epoch, update } = response;
let ObservedUpdate {
author,
observed: peer_view,
signature,
} = update;
ensure!(
epoch == self.epoch_state.epoch,
"adding peer observation failed with invalid epoch",
);
ensure!(
author == sender,
"adding peer observation failed with mismatched author",
);

let mut aggregator = self.inner_state.lock();
if aggregator.contributors.contains(&sender) {
return Ok(None);
}

ensure!(
self.local_view == peer_view,
"adding peer observation failed with mismatched view"

Check warning on line 74 in crates/aptos-jwk-consensus/src/observation_aggregation/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/aptos-jwk-consensus/src/observation_aggregation/mod.rs#L74

Added line #L74 was not covered by tests
);

// Verify the quorum-cert.
self.epoch_state
.verifier
.verify(sender, &peer_view, &signature)?;

// All checks passed. Aggregating.
aggregator.contributors.insert(sender);
let new_multi_sig = if let Some(existing) = aggregator.multi_sig.take() {
bls12381::Signature::aggregate(vec![existing, signature])?
} else {
signature
};

let maybe_qc_update = self
.epoch_state
.verifier
.check_voting_power(aggregator.contributors.iter(), true)
.ok()
.map(|_| QuorumCertifiedUpdate {
authors: aggregator.contributors.clone().into_iter().collect(),
update: peer_view,
multi_sig: new_multi_sig.clone(),
});

aggregator.multi_sig = Some(new_multi_sig);

Ok(maybe_qc_update)
}
}

#[cfg(test)]
mod tests;
143 changes: 143 additions & 0 deletions crates/aptos-jwk-consensus/src/observation_aggregation/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright © Aptos Foundation

use crate::{
observation_aggregation::ObservationAggregationState,
types::{ObservedUpdate, ObservedUpdateResponse},
};
use aptos_bitvec::BitVec;
use aptos_crypto::{bls12381, SigningKey, Uniform};
use aptos_reliable_broadcast::BroadcastStatus;
use aptos_types::{
aggregate_signature::AggregateSignature,
epoch_state::EpochState,
jwks::{
jwk::{JWKMoveStruct, JWK},
unsupported::UnsupportedJWK,
ProviderJWKs, QuorumCertifiedUpdate,
},
validator_verifier::{ValidatorConsensusInfo, ValidatorVerifier},
};
use move_core_types::account_address::AccountAddress;
use std::sync::Arc;

#[test]
fn test_observation_aggregation_state() {
let num_validators = 5;
let epoch = 999;
let addrs: Vec<AccountAddress> = (0..num_validators)
.map(|_| AccountAddress::random())
.collect();
let private_keys: Vec<bls12381::PrivateKey> = (0..num_validators)
.map(|_| bls12381::PrivateKey::generate_for_testing())
.collect();
let public_keys: Vec<bls12381::PublicKey> = (0..num_validators)
.map(|i| bls12381::PublicKey::from(&private_keys[i]))
.collect();
let voting_powers = [1, 1, 1, 6, 6]; // total voting power: 15, default threshold: 11
let validator_infos: Vec<ValidatorConsensusInfo> = (0..num_validators)
.map(|i| ValidatorConsensusInfo::new(addrs[i], public_keys[i].clone(), voting_powers[i]))
.collect();
let verifier = ValidatorVerifier::new(validator_infos);
let epoch_state = Arc::new(EpochState { epoch, verifier });
let view_0 = ProviderJWKs {
issuer: b"https::/alice.com".to_vec(),
version: 123,
jwks: vec![JWKMoveStruct::from(JWK::Unsupported(
UnsupportedJWK::new_for_testing("id1", "payload1"),
))],
};
let view_1 = ProviderJWKs {
issuer: b"https::/alice.com".to_vec(),
version: 123,
jwks: vec![JWKMoveStruct::from(JWK::Unsupported(
UnsupportedJWK::new_for_testing("id2", "payload2"),
))],
};
let ob_agg_state = Arc::new(ObservationAggregationState::new(
epoch_state.clone(),
view_0.clone(),
));

// `ObservedUpdate` with incorrect epoch should be rejected.
let result = ob_agg_state.add(addrs[0], ObservedUpdateResponse {
epoch: 998,
update: ObservedUpdate {
author: addrs[0],
observed: view_0.clone(),
signature: private_keys[0].sign(&view_0).unwrap(),
},
});
assert!(result.is_err());

// `ObservedUpdate` authored by X but sent by Y should be rejected.
let result = ob_agg_state.add(addrs[1], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[0],
observed: view_0.clone(),
signature: private_keys[0].sign(&view_0).unwrap(),
},
});
assert!(result.is_err());

// `ObservedUpdate` that cannot be verified should be rejected.
let result = ob_agg_state.add(addrs[2], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[2],
observed: view_0.clone(),
signature: private_keys[2].sign(&view_1).unwrap(),
},
});
assert!(result.is_err());

// Good `ObservedUpdate` should be accepted.
let result = ob_agg_state.add(addrs[3], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[3],
observed: view_0.clone(),
signature: private_keys[3].sign(&view_0).unwrap(),
},
});
assert!(matches!(result, Ok(None)));

// `ObservedUpdate` from contributed author should be ignored.
let result = ob_agg_state.add(addrs[3], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[3],
observed: view_0.clone(),
signature: private_keys[3].sign(&view_0).unwrap(),
},
});
assert!(matches!(result, Ok(None)));

// Quorum-certified update should be returned if after adding an `ObservedUpdate`, the threshold is exceeded.
let result = ob_agg_state.add(addrs[4], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[4],
observed: view_0.clone(),
signature: private_keys[4].sign(&view_0).unwrap(),
},
});
let QuorumCertifiedUpdate {
authors,
update: observed,
multi_sig,
} = result.unwrap().unwrap();
assert_eq!(view_0, observed);
let bits: Vec<bool> = epoch_state
.verifier
.get_ordered_account_addresses()
.into_iter()
.map(|addr| authors.contains(&addr))
.collect();
let bit_vec = BitVec::from(bits);
let multi_sig = AggregateSignature::new(bit_vec, Some(multi_sig));
assert!(epoch_state
.verifier
.verify_multi_signatures(&observed, &multi_sig)
.is_ok());
}

0 comments on commit b019753

Please sign in to comment.