From b019753fa80dfab6483eb66b87c7a0b90ca8910d Mon Sep 17 00:00:00 2001 From: "zhoujun.ma" Date: Thu, 1 Feb 2024 07:30:45 -0800 Subject: [PATCH] jwk update quorum certification --- .../src/certified_update_producer.rs | 65 ++++++++ crates/aptos-jwk-consensus/src/lib.rs | 2 + .../src/observation_aggregation/mod.rs | 108 +++++++++++++ .../src/observation_aggregation/tests.rs | 143 ++++++++++++++++++ 4 files changed, 318 insertions(+) create mode 100644 crates/aptos-jwk-consensus/src/certified_update_producer.rs create mode 100644 crates/aptos-jwk-consensus/src/observation_aggregation/mod.rs create mode 100644 crates/aptos-jwk-consensus/src/observation_aggregation/tests.rs diff --git a/crates/aptos-jwk-consensus/src/certified_update_producer.rs b/crates/aptos-jwk-consensus/src/certified_update_producer.rs new file mode 100644 index 0000000000000..d60c241573f4e --- /dev/null +++ b/crates/aptos-jwk-consensus/src/certified_update_producer.rs @@ -0,0 +1,65 @@ +// Copyright © Aptos Foundation + +use crate::{ + observation_aggregation::ObservationAggregationState, + types::{JWKConsensusMsg, ObservedUpdateRequest}, +}; +use aptos_channels::aptos_channel; +use aptos_reliable_broadcast::ReliableBroadcast; +use aptos_types::{ + epoch_state::EpochState, + jwks::{ProviderJWKs, QuorumCertifiedUpdate}, +}; +use futures_util::future::{AbortHandle, Abortable}; +use std::sync::Arc; +use tokio_retry::strategy::ExponentialBackoff; + +/// A sub-process of the whole JWK consensus process. +/// Once invoked by `JWKConsensusManager` to `start_produce`, +/// it starts producing a `QuorumCertifiedUpdate` and returns an abort handle. +/// Once an `QuorumCertifiedUpdate` is available, it is sent back via a channel given earlier. +pub trait CertifiedUpdateProducer: Send + Sync { + fn start_produce( + &self, + epoch_state: Arc, + payload: ProviderJWKs, + qc_update_tx: Option>, + ) -> AbortHandle; +} + +pub struct RealCertifiedUpdateProducer { + reliable_broadcast: Arc>, +} + +impl RealCertifiedUpdateProducer { + pub fn new(reliable_broadcast: ReliableBroadcast) -> Self { + Self { + reliable_broadcast: Arc::new(reliable_broadcast), + } + } +} + +impl CertifiedUpdateProducer for RealCertifiedUpdateProducer { + fn start_produce( + &self, + epoch_state: Arc, + payload: ProviderJWKs, + qc_update_tx: Option>, + ) -> AbortHandle { + let rb = self.reliable_broadcast.clone(); + let req = ObservedUpdateRequest { + epoch: epoch_state.epoch, + issuer: payload.issuer.clone(), + }; + let agg_state = Arc::new(ObservationAggregationState::new(epoch_state, payload)); + let task = async move { + let qc_update = rb.broadcast(req, agg_state).await; + if let Some(tx) = qc_update_tx { + let _ = tx.push((), qc_update); + } + }; + let (abort_handle, abort_registration) = AbortHandle::new_pair(); + tokio::spawn(Abortable::new(task, abort_registration)); + abort_handle + } +} diff --git a/crates/aptos-jwk-consensus/src/lib.rs b/crates/aptos-jwk-consensus/src/lib.rs index 737b1a1f8a1ca..d51816ec21376 100644 --- a/crates/aptos-jwk-consensus/src/lib.rs +++ b/crates/aptos-jwk-consensus/src/lib.rs @@ -29,6 +29,8 @@ pub fn start_jwk_consensus_runtime( runtime } +pub mod certified_update_producer; pub mod network; pub mod network_interface; +pub mod observation_aggregation; pub mod types; diff --git a/crates/aptos-jwk-consensus/src/observation_aggregation/mod.rs b/crates/aptos-jwk-consensus/src/observation_aggregation/mod.rs new file mode 100644 index 0000000000000..ae850996fa40e --- /dev/null +++ b/crates/aptos-jwk-consensus/src/observation_aggregation/mod.rs @@ -0,0 +1,108 @@ +// Copyright © Aptos Foundation + +use crate::types::{ + JWKConsensusMsg, ObservedUpdate, ObservedUpdateRequest, ObservedUpdateResponse, +}; +use anyhow::ensure; +use aptos_consensus_types::common::Author; +use aptos_crypto::bls12381; +use aptos_infallible::Mutex; +use aptos_reliable_broadcast::BroadcastStatus; +use aptos_types::{ + epoch_state::EpochState, + jwks::{ProviderJWKs, QuorumCertifiedUpdate}, +}; +use move_core_types::account_address::AccountAddress; +use std::{collections::HashSet, sync::Arc}; + +/// The aggregation state of reliable broadcast where a validator broadcast JWK observation requests +/// and produce quorum-certified JWK updates. +pub struct ObservationAggregationState { + epoch_state: Arc, + local_view: ProviderJWKs, + inner_state: Mutex, +} + +#[derive(Default)] +struct InnerState { + pub contributors: HashSet, + pub multi_sig: Option, +} + +impl ObservationAggregationState { + pub fn new(epoch_state: Arc, local_view: ProviderJWKs) -> Self { + Self { + epoch_state, + local_view, + inner_state: Mutex::new(InnerState::default()), + } + } +} + +impl BroadcastStatus for Arc { + type Aggregated = QuorumCertifiedUpdate; + type Message = ObservedUpdateRequest; + type Response = ObservedUpdateResponse; + + fn add( + &self, + sender: Author, + response: Self::Response, + ) -> anyhow::Result> { + let ObservedUpdateResponse { epoch, update } = response; + let ObservedUpdate { + author, + observed: peer_view, + signature, + } = update; + ensure!( + epoch == self.epoch_state.epoch, + "adding peer observation failed with invalid epoch", + ); + ensure!( + author == sender, + "adding peer observation failed with mismatched author", + ); + + let mut aggregator = self.inner_state.lock(); + if aggregator.contributors.contains(&sender) { + return Ok(None); + } + + ensure!( + self.local_view == peer_view, + "adding peer observation failed with mismatched view" + ); + + // Verify the quorum-cert. + self.epoch_state + .verifier + .verify(sender, &peer_view, &signature)?; + + // All checks passed. Aggregating. + aggregator.contributors.insert(sender); + let new_multi_sig = if let Some(existing) = aggregator.multi_sig.take() { + bls12381::Signature::aggregate(vec![existing, signature])? + } else { + signature + }; + + let maybe_qc_update = self + .epoch_state + .verifier + .check_voting_power(aggregator.contributors.iter(), true) + .ok() + .map(|_| QuorumCertifiedUpdate { + authors: aggregator.contributors.clone().into_iter().collect(), + update: peer_view, + multi_sig: new_multi_sig.clone(), + }); + + aggregator.multi_sig = Some(new_multi_sig); + + Ok(maybe_qc_update) + } +} + +#[cfg(test)] +mod tests; diff --git a/crates/aptos-jwk-consensus/src/observation_aggregation/tests.rs b/crates/aptos-jwk-consensus/src/observation_aggregation/tests.rs new file mode 100644 index 0000000000000..60c4ca16db450 --- /dev/null +++ b/crates/aptos-jwk-consensus/src/observation_aggregation/tests.rs @@ -0,0 +1,143 @@ +// Copyright © Aptos Foundation + +use crate::{ + observation_aggregation::ObservationAggregationState, + types::{ObservedUpdate, ObservedUpdateResponse}, +}; +use aptos_bitvec::BitVec; +use aptos_crypto::{bls12381, SigningKey, Uniform}; +use aptos_reliable_broadcast::BroadcastStatus; +use aptos_types::{ + aggregate_signature::AggregateSignature, + epoch_state::EpochState, + jwks::{ + jwk::{JWKMoveStruct, JWK}, + unsupported::UnsupportedJWK, + ProviderJWKs, QuorumCertifiedUpdate, + }, + validator_verifier::{ValidatorConsensusInfo, ValidatorVerifier}, +}; +use move_core_types::account_address::AccountAddress; +use std::sync::Arc; + +#[test] +fn test_observation_aggregation_state() { + let num_validators = 5; + let epoch = 999; + let addrs: Vec = (0..num_validators) + .map(|_| AccountAddress::random()) + .collect(); + let private_keys: Vec = (0..num_validators) + .map(|_| bls12381::PrivateKey::generate_for_testing()) + .collect(); + let public_keys: Vec = (0..num_validators) + .map(|i| bls12381::PublicKey::from(&private_keys[i])) + .collect(); + let voting_powers = [1, 1, 1, 6, 6]; // total voting power: 15, default threshold: 11 + let validator_infos: Vec = (0..num_validators) + .map(|i| ValidatorConsensusInfo::new(addrs[i], public_keys[i].clone(), voting_powers[i])) + .collect(); + let verifier = ValidatorVerifier::new(validator_infos); + let epoch_state = Arc::new(EpochState { epoch, verifier }); + let view_0 = ProviderJWKs { + issuer: b"https::/alice.com".to_vec(), + version: 123, + jwks: vec![JWKMoveStruct::from(JWK::Unsupported( + UnsupportedJWK::new_for_testing("id1", "payload1"), + ))], + }; + let view_1 = ProviderJWKs { + issuer: b"https::/alice.com".to_vec(), + version: 123, + jwks: vec![JWKMoveStruct::from(JWK::Unsupported( + UnsupportedJWK::new_for_testing("id2", "payload2"), + ))], + }; + let ob_agg_state = Arc::new(ObservationAggregationState::new( + epoch_state.clone(), + view_0.clone(), + )); + + // `ObservedUpdate` with incorrect epoch should be rejected. + let result = ob_agg_state.add(addrs[0], ObservedUpdateResponse { + epoch: 998, + update: ObservedUpdate { + author: addrs[0], + observed: view_0.clone(), + signature: private_keys[0].sign(&view_0).unwrap(), + }, + }); + assert!(result.is_err()); + + // `ObservedUpdate` authored by X but sent by Y should be rejected. + let result = ob_agg_state.add(addrs[1], ObservedUpdateResponse { + epoch: 999, + update: ObservedUpdate { + author: addrs[0], + observed: view_0.clone(), + signature: private_keys[0].sign(&view_0).unwrap(), + }, + }); + assert!(result.is_err()); + + // `ObservedUpdate` that cannot be verified should be rejected. + let result = ob_agg_state.add(addrs[2], ObservedUpdateResponse { + epoch: 999, + update: ObservedUpdate { + author: addrs[2], + observed: view_0.clone(), + signature: private_keys[2].sign(&view_1).unwrap(), + }, + }); + assert!(result.is_err()); + + // Good `ObservedUpdate` should be accepted. + let result = ob_agg_state.add(addrs[3], ObservedUpdateResponse { + epoch: 999, + update: ObservedUpdate { + author: addrs[3], + observed: view_0.clone(), + signature: private_keys[3].sign(&view_0).unwrap(), + }, + }); + assert!(matches!(result, Ok(None))); + + // `ObservedUpdate` from contributed author should be ignored. + let result = ob_agg_state.add(addrs[3], ObservedUpdateResponse { + epoch: 999, + update: ObservedUpdate { + author: addrs[3], + observed: view_0.clone(), + signature: private_keys[3].sign(&view_0).unwrap(), + }, + }); + assert!(matches!(result, Ok(None))); + + // Quorum-certified update should be returned if after adding an `ObservedUpdate`, the threshold is exceeded. + let result = ob_agg_state.add(addrs[4], ObservedUpdateResponse { + epoch: 999, + update: ObservedUpdate { + author: addrs[4], + observed: view_0.clone(), + signature: private_keys[4].sign(&view_0).unwrap(), + }, + }); + let QuorumCertifiedUpdate { + authors, + update: observed, + multi_sig, + } = result.unwrap().unwrap(); + assert_eq!(view_0, observed); + let bits: Vec = epoch_state + .verifier + .get_ordered_account_addresses() + .into_iter() + .map(|addr| authors.contains(&addr)) + .collect(); + let bit_vec = BitVec::from(bits); + let multi_sig = AggregateSignature::new(bit_vec, Some(multi_sig)); + assert!(epoch_state + .verifier + .verify_multi_signatures(&observed, &multi_sig) + .is_ok()); +}