Skip to content

Commit

Permalink
dkg manager update from randomnet
Browse files Browse the repository at this point in the history
  • Loading branch information
zjma committed Feb 28, 2024
1 parent 2392d2a commit e675e73
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 185 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dkg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ aptos-validator-transaction-pool = { workspace = true }
async-trait = { workspace = true }
bcs = { workspace = true }
bytes = { workspace = true }
fail = { workspace = true }
futures = { workspace = true }
futures-channel = { workspace = true }
futures-util = { workspace = true }
Expand Down
35 changes: 31 additions & 4 deletions dkg/src/agg_trx_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use crate::{
transcript_aggregation::TranscriptAggregationState, types::DKGTranscriptRequest, DKGMessage,
};
use aptos_channels::aptos_channel::Sender;
use aptos_logger::info;
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_types::{dkg::DKGTrait, epoch_state::EpochState};
use futures::future::AbortHandle;
use futures_util::future::Abortable;
use std::sync::Arc;
use move_core_types::account_address::AccountAddress;
use std::{sync::Arc, time::Duration};
use tokio_retry::strategy::ExponentialBackoff;

/// A sub-process of the whole DKG process.
Expand All @@ -18,6 +20,8 @@ use tokio_retry::strategy::ExponentialBackoff;
pub trait TAggTranscriptProducer<S: DKGTrait>: Send + Sync {
fn start_produce(
&self,
start_time: Duration,
my_addr: AccountAddress,
epoch_state: Arc<EpochState>,
dkg_config: S::PublicParams,
agg_trx_tx: Option<Sender<(), S::Transcript>>,
Expand All @@ -40,17 +44,38 @@ impl AggTranscriptProducer {
impl<DKG: DKGTrait + 'static> TAggTranscriptProducer<DKG> for AggTranscriptProducer {
fn start_produce(
&self,
start_time: Duration,
my_addr: AccountAddress,
epoch_state: Arc<EpochState>,
params: DKG::PublicParams,
agg_trx_tx: Option<Sender<(), DKG::Transcript>>,
) -> AbortHandle {
let epoch = epoch_state.epoch;
let rb = self.reliable_broadcast.clone();
let req = DKGTranscriptRequest::new(epoch_state.epoch);
let agg_state = Arc::new(TranscriptAggregationState::<DKG>::new(params, epoch_state));
let agg_state = Arc::new(TranscriptAggregationState::<DKG>::new(
start_time,
my_addr,
params,
epoch_state,
));
let task = async move {
let agg_trx = rb.broadcast(req, agg_state).await;
if let Some(tx) = agg_trx_tx {
let _ = tx.push((), agg_trx); // If the `DKGManager` was dropped, this send will fail by design.
info!(
epoch = epoch,
my_addr = my_addr,
"[DKG] aggregated transcript locally"
);
if let Err(e) = agg_trx_tx
.expect("[DKG] agg_trx_tx should be available")
.push((), agg_trx)
{
// If the `DKGManager` was dropped, this send will fail by design.
info!(
epoch = epoch,
my_addr = my_addr,
"[DKG] Failed to send aggregated transcript to DKGManager, maybe DKGManager stopped and channel dropped: {:?}", e
);
}
};
let (abort_handle, abort_registration) = AbortHandle::new_pair();
Expand All @@ -66,6 +91,8 @@ pub struct DummyAggTranscriptProducer {}
impl<DKG: DKGTrait> TAggTranscriptProducer<DKG> for DummyAggTranscriptProducer {
fn start_produce(
&self,
_start_time: Duration,
_my_addr: AccountAddress,
_epoch_state: Arc<EpochState>,
_dkg_config: DKG::PublicParams,
_agg_trx_tx: Option<Sender<(), DKG::Transcript>>,
Expand Down
11 changes: 10 additions & 1 deletion dkg/src/counters.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright © Aptos Foundation

use aptos_metrics_core::{register_int_gauge, IntGauge};
use aptos_metrics_core::{register_histogram_vec, register_int_gauge, HistogramVec, IntGauge};
use once_cell::sync::Lazy;

/// Count of the pending messages sent to itself in the channel
Expand All @@ -11,3 +11,12 @@ pub static PENDING_SELF_MESSAGES: Lazy<IntGauge> = Lazy::new(|| {
)
.unwrap()
});

pub static DKG_STAGE_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_dkg_session_stage_seconds",
"How long it takes to reach different DKG stages",
&["dealer", "stage"]
)
.unwrap()
});
Loading

0 comments on commit e675e73

Please sign in to comment.