Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jwk #4: jwk update quorum certification #11857

Merged
merged 25 commits into from
Feb 5, 2024
Merged
2 changes: 2 additions & 0 deletions crates/aptos-jwk-consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ pub fn start_jwk_consensus_runtime(

pub mod network;
pub mod network_interface;
pub mod observation_aggregation;
pub mod types;
pub mod update_certifier;
106 changes: 106 additions & 0 deletions crates/aptos-jwk-consensus/src/observation_aggregation/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright © Aptos Foundation

use crate::types::{
JWKConsensusMsg, ObservedUpdate, ObservedUpdateRequest, ObservedUpdateResponse,
};
use anyhow::{anyhow, ensure};
use aptos_consensus_types::common::Author;
use aptos_crypto::bls12381::Signature;
use aptos_infallible::Mutex;
use aptos_reliable_broadcast::BroadcastStatus;
use aptos_types::{
aggregate_signature::PartialSignatures,
epoch_state::EpochState,
jwks::{ProviderJWKs, QuorumCertifiedUpdate},
};
use move_core_types::account_address::AccountAddress;
use std::{collections::BTreeSet, sync::Arc};

/// The aggregation state of reliable broadcast where a validator broadcast JWK observation requests
/// and produce quorum-certified JWK updates.
pub struct ObservationAggregationState {
epoch_state: Arc<EpochState>,
local_view: ProviderJWKs,
inner_state: Mutex<PartialSignatures>,
}

impl ObservationAggregationState {
pub fn new(epoch_state: Arc<EpochState>, local_view: ProviderJWKs) -> Self {
Self {
epoch_state,
local_view,
inner_state: Mutex::new(PartialSignatures::empty()),
}
}
}

impl BroadcastStatus<JWKConsensusMsg> for Arc<ObservationAggregationState> {
type Aggregated = QuorumCertifiedUpdate;
type Message = ObservedUpdateRequest;
type Response = ObservedUpdateResponse;

fn add(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reliable broadcast may panic, if a validator has a different observation than anyone else? If the rb receives all response but does not aggregate it will panic.

Copy link
Contributor

@danielxiangzl danielxiangzl Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For fixes, either rb keeps fetching if the response is different than mine, or allow aggregation to fail and return None. In the later case the validator needs to retry rb to fetch again. Let's discuss what is better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM, I saw you already did the first one.

&self,
sender: Author,
response: Self::Response,
) -> anyhow::Result<Option<Self::Aggregated>> {
let ObservedUpdateResponse { epoch, update } = response;
let ObservedUpdate {
author,
observed: peer_view,
signature,
} = update;
ensure!(
epoch == self.epoch_state.epoch,
"adding peer observation failed with invalid epoch",
);
ensure!(
author == sender,
"adding peer observation failed with mismatched author",
);

let mut partial_sigs = self.inner_state.lock();
if partial_sigs.contains_voter(&sender) {
return Ok(None);
}

ensure!(
self.local_view == peer_view,
"adding peer observation failed with mismatched view"
);

// Verify peer signature.
self.epoch_state
.verifier
.verify(sender, &peer_view, &signature)?;

// All checks passed. Aggregating.
partial_sigs.add_signature(sender, signature);
let voters: BTreeSet<AccountAddress> = partial_sigs.signatures().keys().copied().collect();
if self
.epoch_state
.verifier
.check_voting_power(voters.iter(), true)
.is_err()
{
return Ok(None);
}
let multi_sig = Signature::aggregate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a verifier.aggregate_signature function

partial_sigs
.signatures()
.values()
.cloned()
.collect::<Vec<_>>(),
)
.map_err(|e| anyhow!("jwk update certification failed with sig agg error: {e}"))?;

Ok(Some(QuorumCertifiedUpdate {
authors: voters,
update: peer_view,
multi_sig,
}))
}
}

#[cfg(test)]
mod tests;
143 changes: 143 additions & 0 deletions crates/aptos-jwk-consensus/src/observation_aggregation/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright © Aptos Foundation

use crate::{
observation_aggregation::ObservationAggregationState,
types::{ObservedUpdate, ObservedUpdateResponse},
};
use aptos_bitvec::BitVec;
use aptos_crypto::{bls12381, SigningKey, Uniform};
use aptos_reliable_broadcast::BroadcastStatus;
use aptos_types::{
aggregate_signature::AggregateSignature,
epoch_state::EpochState,
jwks::{
jwk::{JWKMoveStruct, JWK},
unsupported::UnsupportedJWK,
ProviderJWKs, QuorumCertifiedUpdate,
},
validator_verifier::{ValidatorConsensusInfo, ValidatorVerifier},
};
use move_core_types::account_address::AccountAddress;
use std::sync::Arc;

#[test]
fn test_observation_aggregation_state() {
let num_validators = 5;
let epoch = 999;
let addrs: Vec<AccountAddress> = (0..num_validators)
.map(|_| AccountAddress::random())
.collect();
let private_keys: Vec<bls12381::PrivateKey> = (0..num_validators)
.map(|_| bls12381::PrivateKey::generate_for_testing())
.collect();
let public_keys: Vec<bls12381::PublicKey> = (0..num_validators)
.map(|i| bls12381::PublicKey::from(&private_keys[i]))
.collect();
let voting_powers = [1, 1, 1, 6, 6]; // total voting power: 15, default threshold: 11
let validator_infos: Vec<ValidatorConsensusInfo> = (0..num_validators)
.map(|i| ValidatorConsensusInfo::new(addrs[i], public_keys[i].clone(), voting_powers[i]))
.collect();
let verifier = ValidatorVerifier::new(validator_infos);
let epoch_state = Arc::new(EpochState { epoch, verifier });
let view_0 = ProviderJWKs {
issuer: b"https::/alice.com".to_vec(),
version: 123,
jwks: vec![JWKMoveStruct::from(JWK::Unsupported(
UnsupportedJWK::new_for_testing("id1", "payload1"),
))],
};
let view_1 = ProviderJWKs {
issuer: b"https::/alice.com".to_vec(),
version: 123,
jwks: vec![JWKMoveStruct::from(JWK::Unsupported(
UnsupportedJWK::new_for_testing("id2", "payload2"),
))],
};
let ob_agg_state = Arc::new(ObservationAggregationState::new(
epoch_state.clone(),
view_0.clone(),
));

// `ObservedUpdate` with incorrect epoch should be rejected.
let result = ob_agg_state.add(addrs[0], ObservedUpdateResponse {
epoch: 998,
update: ObservedUpdate {
author: addrs[0],
observed: view_0.clone(),
signature: private_keys[0].sign(&view_0).unwrap(),
},
});
assert!(result.is_err());

// `ObservedUpdate` authored by X but sent by Y should be rejected.
let result = ob_agg_state.add(addrs[1], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[0],
observed: view_0.clone(),
signature: private_keys[0].sign(&view_0).unwrap(),
},
});
assert!(result.is_err());

// `ObservedUpdate` that cannot be verified should be rejected.
let result = ob_agg_state.add(addrs[2], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[2],
observed: view_0.clone(),
signature: private_keys[2].sign(&view_1).unwrap(),
},
});
assert!(result.is_err());

// Good `ObservedUpdate` should be accepted.
let result = ob_agg_state.add(addrs[3], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[3],
observed: view_0.clone(),
signature: private_keys[3].sign(&view_0).unwrap(),
},
});
assert!(matches!(result, Ok(None)));

// `ObservedUpdate` from contributed author should be ignored.
let result = ob_agg_state.add(addrs[3], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[3],
observed: view_0.clone(),
signature: private_keys[3].sign(&view_0).unwrap(),
},
});
assert!(matches!(result, Ok(None)));

// Quorum-certified update should be returned if after adding an `ObservedUpdate`, the threshold is exceeded.
let result = ob_agg_state.add(addrs[4], ObservedUpdateResponse {
epoch: 999,
update: ObservedUpdate {
author: addrs[4],
observed: view_0.clone(),
signature: private_keys[4].sign(&view_0).unwrap(),
},
});
let QuorumCertifiedUpdate {
authors,
update: observed,
multi_sig,
} = result.unwrap().unwrap();
assert_eq!(view_0, observed);
let bits: Vec<bool> = epoch_state
.verifier
.get_ordered_account_addresses()
.into_iter()
.map(|addr| authors.contains(&addr))
.collect();
let bit_vec = BitVec::from(bits);
let multi_sig = AggregateSignature::new(bit_vec, Some(multi_sig));
assert!(epoch_state
.verifier
.verify_multi_signatures(&observed, &multi_sig)
.is_ok());
}
63 changes: 63 additions & 0 deletions crates/aptos-jwk-consensus/src/update_certifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright © Aptos Foundation

use crate::{
observation_aggregation::ObservationAggregationState,
types::{JWKConsensusMsg, ObservedUpdateRequest},
};
use aptos_channels::aptos_channel;
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_types::{
epoch_state::EpochState,
jwks::{ProviderJWKs, QuorumCertifiedUpdate},
};
use futures_util::future::{AbortHandle, Abortable};
use std::sync::Arc;
use tokio_retry::strategy::ExponentialBackoff;

/// A sub-process of the whole JWK consensus process.
/// Once invoked by `JWKConsensusManager` to `start_produce`,
/// it starts producing a `QuorumCertifiedUpdate` and returns an abort handle.
/// Once an `QuorumCertifiedUpdate` is available, it is sent back via a channel given earlier.
pub trait TUpdateCertifier: Send + Sync {
fn start_produce(
&self,
epoch_state: Arc<EpochState>,
payload: ProviderJWKs,
qc_update_tx: aptos_channel::Sender<(), QuorumCertifiedUpdate>,
) -> AbortHandle;
}

pub struct CertifiedUpdateProducer {
reliable_broadcast: Arc<ReliableBroadcast<JWKConsensusMsg, ExponentialBackoff>>,
}

impl CertifiedUpdateProducer {
pub fn new(reliable_broadcast: ReliableBroadcast<JWKConsensusMsg, ExponentialBackoff>) -> Self {
Self {
reliable_broadcast: Arc::new(reliable_broadcast),
}
}
}

impl TUpdateCertifier for CertifiedUpdateProducer {
fn start_produce(
&self,
epoch_state: Arc<EpochState>,
payload: ProviderJWKs,
qc_update_tx: aptos_channel::Sender<(), QuorumCertifiedUpdate>,
) -> AbortHandle {
let rb = self.reliable_broadcast.clone();
let req = ObservedUpdateRequest {
epoch: epoch_state.epoch,
issuer: payload.issuer.clone(),
};
let agg_state = Arc::new(ObservationAggregationState::new(epoch_state, payload));
let task = async move {
let qc_update = rb.broadcast(req, agg_state).await;
let _ = qc_update_tx.push((), qc_update);
};
let (abort_handle, abort_registration) = AbortHandle::new_pair();
tokio::spawn(Abortable::new(task, abort_registration));
abort_handle
}
}
4 changes: 4 additions & 0 deletions types/src/aggregate_signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,8 @@ impl PartialSignatures {
pub fn signatures(&self) -> &BTreeMap<AccountAddress, bls12381::Signature> {
&self.signatures
}

pub fn contains_voter(&self, voter: &AccountAddress) -> bool {
self.signatures.contains_key(voter)
}
}
Loading