From a205b1ee7b8f3f320f60c93446a0a9d00dfbfb8a Mon Sep 17 00:00:00 2001
From: "zhoujun.ma" <zhoujun@aptoslabs.com>
Date: Sun, 25 Feb 2024 04:02:03 -0800
Subject: [PATCH] dkg manager update from randomnet

---
 Cargo.lock                              |   1 +
 dkg/Cargo.toml                          |   1 +
 dkg/src/agg_trx_producer.rs             |  35 ++-
 dkg/src/counters.rs                     |  11 +-
 dkg/src/dkg_manager/mod.rs              | 397 ++++++++++++++++--------
 dkg/src/dkg_manager/tests.rs            |  37 ++-
 dkg/src/epoch_manager.rs                |  25 +-
 dkg/src/lib.rs                          |   3 +-
 dkg/src/transcript_aggregation/mod.rs   |  83 ++++-
 dkg/src/transcript_aggregation/tests.rs |  14 +
 dkg/src/types.rs                        |  12 +-
 11 files changed, 434 insertions(+), 185 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 087815a3429d8..c7cfd9bc7daca 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1313,6 +1313,7 @@ dependencies = [
  "async-trait",
  "bcs 0.1.4",
  "bytes",
+ "fail 0.5.1",
  "futures",
  "futures-channel",
  "futures-util",
diff --git a/dkg/Cargo.toml b/dkg/Cargo.toml
index dc338ebe86ca1..5ecd984e3bdb0 100644
--- a/dkg/Cargo.toml
+++ b/dkg/Cargo.toml
@@ -36,6 +36,7 @@ aptos-validator-transaction-pool = { workspace = true }
 async-trait = { workspace = true }
 bcs = { workspace = true }
 bytes = { workspace = true }
+fail = { workspace = true }
 futures = { workspace = true }
 futures-channel = { workspace = true }
 futures-util = { workspace = true }
diff --git a/dkg/src/agg_trx_producer.rs b/dkg/src/agg_trx_producer.rs
index 7d76caab5c294..b42ed929234f6 100644
--- a/dkg/src/agg_trx_producer.rs
+++ b/dkg/src/agg_trx_producer.rs
@@ -4,11 +4,13 @@ use crate::{
     transcript_aggregation::TranscriptAggregationState, types::DKGTranscriptRequest, DKGMessage,
 };
 use aptos_channels::aptos_channel::Sender;
+use aptos_logger::info;
 use aptos_reliable_broadcast::ReliableBroadcast;
 use aptos_types::{dkg::DKGTrait, epoch_state::EpochState};
 use futures::future::AbortHandle;
 use futures_util::future::Abortable;
-use std::sync::Arc;
+use move_core_types::account_address::AccountAddress;
+use std::{sync::Arc, time::Duration};
 use tokio_retry::strategy::ExponentialBackoff;
 
 /// A sub-process of the whole DKG process.
@@ -18,6 +20,8 @@ use tokio_retry::strategy::ExponentialBackoff;
 pub trait TAggTranscriptProducer<S: DKGTrait>: Send + Sync {
     fn start_produce(
         &self,
+        start_time: Duration,
+        my_addr: AccountAddress,
         epoch_state: Arc<EpochState>,
         dkg_config: S::PublicParams,
         agg_trx_tx: Option<Sender<(), S::Transcript>>,
@@ -40,17 +44,38 @@ impl AggTranscriptProducer {
 impl<DKG: DKGTrait + 'static> TAggTranscriptProducer<DKG> for AggTranscriptProducer {
     fn start_produce(
         &self,
+        start_time: Duration,
+        my_addr: AccountAddress,
         epoch_state: Arc<EpochState>,
         params: DKG::PublicParams,
         agg_trx_tx: Option<Sender<(), DKG::Transcript>>,
     ) -> AbortHandle {
+        let epoch = epoch_state.epoch;
         let rb = self.reliable_broadcast.clone();
         let req = DKGTranscriptRequest::new(epoch_state.epoch);
-        let agg_state = Arc::new(TranscriptAggregationState::<DKG>::new(params, epoch_state));
+        let agg_state = Arc::new(TranscriptAggregationState::<DKG>::new(
+            start_time,
+            my_addr,
+            params,
+            epoch_state,
+        ));
         let task = async move {
             let agg_trx = rb.broadcast(req, agg_state).await;
-            if let Some(tx) = agg_trx_tx {
-                let _ = tx.push((), agg_trx); // If the `DKGManager` was dropped, this send will fail by design.
+            info!(
+                epoch = epoch,
+                my_addr = my_addr,
+                "[DKG] aggregated transcript locally"
+            );
+            if let Err(e) = agg_trx_tx
+                .expect("[DKG] agg_trx_tx should be available")
+                .push((), agg_trx)
+            {
+                // If the `DKGManager` was dropped, this send will fail by design.
+                info!(
+                    epoch = epoch,
+                    my_addr = my_addr,
+                    "[DKG] Failed to send aggregated transcript to DKGManager, maybe DKGManager stopped and channel dropped: {:?}", e
+                );
             }
         };
         let (abort_handle, abort_registration) = AbortHandle::new_pair();
@@ -66,6 +91,8 @@ pub struct DummyAggTranscriptProducer {}
 impl<DKG: DKGTrait> TAggTranscriptProducer<DKG> for DummyAggTranscriptProducer {
     fn start_produce(
         &self,
+        _start_time: Duration,
+        _my_addr: AccountAddress,
         _epoch_state: Arc<EpochState>,
         _dkg_config: DKG::PublicParams,
         _agg_trx_tx: Option<Sender<(), DKG::Transcript>>,
diff --git a/dkg/src/counters.rs b/dkg/src/counters.rs
index 7c490550bed79..da128591e19b3 100644
--- a/dkg/src/counters.rs
+++ b/dkg/src/counters.rs
@@ -1,6 +1,6 @@
 // Copyright © Aptos Foundation
 
-use aptos_metrics_core::{register_int_gauge, IntGauge};
+use aptos_metrics_core::{register_histogram_vec, register_int_gauge, HistogramVec, IntGauge};
 use once_cell::sync::Lazy;
 
 /// Count of the pending messages sent to itself in the channel
@@ -11,3 +11,12 @@ pub static PENDING_SELF_MESSAGES: Lazy<IntGauge> = Lazy::new(|| {
     )
     .unwrap()
 });
+
+pub static DKG_STAGE_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
+    register_histogram_vec!(
+        "aptos_dkg_session_stage_seconds",
+        "How long it takes to reach different DKG stages",
+        &["dealer", "stage"]
+    )
+    .unwrap()
+});
diff --git a/dkg/src/dkg_manager/mod.rs b/dkg/src/dkg_manager/mod.rs
index c0193bc23ddb0..5a473dfbaffcc 100644
--- a/dkg/src/dkg_manager/mod.rs
+++ b/dkg/src/dkg_manager/mod.rs
@@ -1,9 +1,13 @@
 // Copyright © Aptos Foundation
-use crate::{agg_trx_producer::TAggTranscriptProducer, network::IncomingRpcRequest, DKGMessage};
+use crate::{
+    agg_trx_producer::TAggTranscriptProducer, counters::DKG_STAGE_SECONDS,
+    network::IncomingRpcRequest, DKGMessage,
+};
 use anyhow::{anyhow, bail, ensure, Result};
 use aptos_channels::{aptos_channel, message_queues::QueueStyle};
 use aptos_crypto::Uniform;
-use aptos_logger::error;
+use aptos_infallible::duration_since_epoch;
+use aptos_logger::{debug, error, info};
 use aptos_types::{
     dkg::{
         DKGSessionMetadata, DKGSessionState, DKGStartEvent, DKGTrait, DKGTranscript,
@@ -13,62 +17,35 @@ use aptos_types::{
     validator_txn::{Topic, ValidatorTransaction},
 };
 use aptos_validator_transaction_pool::{TxnGuard, VTxnPoolState};
+use fail::fail_point;
 use futures_channel::oneshot;
 use futures_util::{future::AbortHandle, FutureExt, StreamExt};
 use move_core_types::account_address::AccountAddress;
 use rand::{prelude::StdRng, thread_rng, SeedableRng};
-use std::sync::Arc;
+use std::{sync::Arc, time::Duration};
 
-#[allow(dead_code)]
 #[derive(Clone, Debug)]
-enum InnerState<DKG: DKGTrait> {
+enum InnerState {
     NotStarted,
     InProgress {
-        start_time_us: u64,
-        public_params: DKG::PublicParams,
+        start_time: Duration,
         my_transcript: DKGTranscript,
         abort_handle: AbortHandle,
     },
     Finished {
         vtxn_guard: TxnGuard,
-        start_time_us: u64,
+        start_time: Duration,
         my_transcript: DKGTranscript,
-        pull_confirmed: bool,
+        proposed: bool,
     },
 }
 
-impl<DKG: DKGTrait> InnerState<DKG> {
-    fn variant_name(&self) -> &str {
-        match self {
-            InnerState::NotStarted => "NotStarted",
-            InnerState::InProgress { .. } => "InProgress",
-            InnerState::Finished { .. } => "Finished",
-        }
-    }
-
-    #[cfg(test)]
-    pub fn my_node_cloned(&self) -> DKGTranscript {
-        match self {
-            InnerState::NotStarted => panic!("my_node unavailable"),
-            InnerState::InProgress {
-                my_transcript: my_node,
-                ..
-            }
-            | InnerState::Finished {
-                my_transcript: my_node,
-                ..
-            } => my_node.clone(),
-        }
-    }
-}
-
-impl<DKG: DKGTrait> Default for InnerState<DKG> {
+impl Default for InnerState {
     fn default() -> Self {
         Self::NotStarted
     }
 }
 
-#[allow(dead_code)]
 pub struct DKGManager<DKG: DKGTrait> {
     dealer_sk: Arc<DKG::DealerPrivateKey>,
     my_index: usize,
@@ -85,7 +62,26 @@ pub struct DKGManager<DKG: DKGTrait> {
 
     // Control states.
     stopped: bool,
-    state: InnerState<DKG>,
+    state: InnerState,
+}
+
+impl InnerState {
+    fn variant_name(&self) -> &str {
+        match self {
+            InnerState::NotStarted => "NotStarted",
+            InnerState::InProgress { .. } => "InProgress",
+            InnerState::Finished { .. } => "Finished",
+        }
+    }
+
+    #[cfg(test)]
+    pub fn my_node_cloned(&self) -> DKGTranscript {
+        match self {
+            InnerState::NotStarted => panic!("my_node unavailable"),
+            InnerState::InProgress { my_transcript, .. }
+            | InnerState::Finished { my_transcript, .. } => my_transcript.clone(),
+        }
+    }
 }
 
 impl<DKG: DKGTrait> DKGManager<DKG> {
@@ -101,8 +97,8 @@ impl<DKG: DKGTrait> DKGManager<DKG> {
             aptos_channel::new(QueueStyle::KLAST, 1, None);
         Self {
             dealer_sk,
-            my_index,
             my_addr,
+            my_index,
             epoch_state,
             vtxn_pool,
             agg_trx_tx: None,
@@ -117,60 +113,128 @@ impl<DKG: DKGTrait> DKGManager<DKG> {
     pub async fn run(
         mut self,
         in_progress_session: Option<DKGSessionState>,
-        dkg_start_event_rx: oneshot::Receiver<DKGStartEvent>,
+        mut dkg_start_event_rx: aptos_channel::Receiver<(), DKGStartEvent>,
         mut rpc_msg_rx: aptos_channel::Receiver<
             AccountAddress,
             (AccountAddress, IncomingRpcRequest),
         >,
         close_rx: oneshot::Receiver<oneshot::Sender<()>>,
     ) {
+        info!(
+            epoch = self.epoch_state.epoch,
+            my_addr = self.my_addr.to_hex().as_str(),
+            "[DKG] DKGManager started."
+        );
+        let mut interval = tokio::time::interval(Duration::from_millis(5000));
+
+        let (agg_trx_tx, mut agg_trx_rx) = aptos_channel::new(QueueStyle::KLAST, 1, None);
+        self.agg_trx_tx = Some(agg_trx_tx);
+
         if let Some(session_state) = in_progress_session {
             let DKGSessionState {
-                metadata,
                 start_time_us,
+                metadata,
                 ..
             } = session_state;
-            self.setup_deal_broadcast(start_time_us, &metadata)
-                .await
-                .expect("setup_deal_broadcast() should be infallible");
-        }
 
-        let (agg_trx_tx, mut agg_trx_rx) = aptos_channel::new(QueueStyle::KLAST, 1, None);
-        self.agg_trx_tx = Some(agg_trx_tx);
+            if metadata.dealer_epoch == self.epoch_state.epoch {
+                info!(
+                    epoch = self.epoch_state.epoch,
+                    "Found unfinished and current DKG session. Continuing it."
+                );
+                self.setup_deal_broadcast(start_time_us, &metadata)
+                    .await
+                    .expect("[DKG] setup_deal_broadcast() should be infallible");
+            } else {
+                info!(
+                    cur_epoch = self.epoch_state.epoch,
+                    dealer_epoch = metadata.dealer_epoch,
+                    "Found unfinished but stale DKG session. Ignoring it."
+                );
+            }
+        }
 
-        let mut dkg_start_event_rx = dkg_start_event_rx.into_stream();
         let mut close_rx = close_rx.into_stream();
         while !self.stopped {
             let handling_result = tokio::select! {
                 dkg_start_event = dkg_start_event_rx.select_next_some() => {
-                    self.process_dkg_start_event(dkg_start_event.ok()).await
+                    self.process_dkg_start_event(dkg_start_event)
+                        .await
+                        .map_err(|e|anyhow!("[DKG] process_dkg_start_event failed: {e}"))
                 },
                 (_sender, msg) = rpc_msg_rx.select_next_some() => {
-                    self.process_peer_rpc_msg(msg).await
+                    self.process_peer_rpc_msg(msg)
+                        .await
+                        .map_err(|e|anyhow!("[DKG] process_peer_rpc_msg failed: {e}"))
                 },
-                agg_node = agg_trx_rx.select_next_some() => {
-                    self.process_aggregated_transcript(agg_node).await
+                agg_transcript = agg_trx_rx.select_next_some() => {
+                    self.process_aggregated_transcript(agg_transcript)
+                        .await
+                        .map_err(|e|anyhow!("[DKG] process_aggregated_transcript failed: {e}"))
+
                 },
                 dkg_txn = self.pull_notification_rx.select_next_some() => {
-                    self.process_dkg_txn_pulled_notification(dkg_txn).await
+                    self.process_dkg_txn_pulled_notification(dkg_txn)
+                        .await
+                        .map_err(|e|anyhow!("[DKG] process_dkg_txn_pulled_notification failed: {e}"))
                 },
                 close_req = close_rx.select_next_some() => {
                     self.process_close_cmd(close_req.ok())
-                }
+                },
+                _ = interval.tick().fuse() => {
+                    self.observe()
+                },
             };
 
             if let Err(e) = handling_result {
-                error!("{}", e);
+                error!(
+                    epoch = self.epoch_state.epoch,
+                    my_addr = self.my_addr.to_hex().as_str(),
+                    "[DKG] DKGManager handling error: {e}"
+                );
             }
         }
+        info!(
+            epoch = self.epoch_state.epoch,
+            my_addr = self.my_addr.to_hex().as_str(),
+            "[DKG] DKGManager finished."
+        );
+    }
+
+    fn observe(&self) -> Result<()> {
+        debug!("[DKG] dkg_manager_state={:?}", self.state);
+        Ok(())
     }
 
     /// On a CLOSE command from epoch manager, do clean-up.
     fn process_close_cmd(&mut self, ack_tx: Option<oneshot::Sender<()>>) -> Result<()> {
         self.stopped = true;
 
-        if let InnerState::InProgress { abort_handle, .. } = &self.state {
-            abort_handle.abort();
+        match std::mem::take(&mut self.state) {
+            InnerState::NotStarted => {},
+            InnerState::InProgress { abort_handle, .. } => {
+                abort_handle.abort();
+            },
+            InnerState::Finished {
+                vtxn_guard,
+                start_time,
+                ..
+            } => {
+                let epoch_change_time = duration_since_epoch();
+                let secs_since_dkg_start =
+                    epoch_change_time.as_secs_f64() - start_time.as_secs_f64();
+                DKG_STAGE_SECONDS
+                    .with_label_values(&[self.my_addr.to_hex().as_str(), "epoch_change"])
+                    .observe(secs_since_dkg_start);
+                info!(
+                    epoch = self.epoch_state.epoch,
+                    my_addr = self.my_addr,
+                    secs_since_dkg_start = secs_since_dkg_start,
+                    "[DKG] txn executed and entering new epoch.",
+                );
+
+                drop(vtxn_guard);
+            },
         }
 
         if let Some(tx) = ack_tx {
@@ -185,13 +249,33 @@ impl<DKG: DKGTrait> DKGManager<DKG> {
         &mut self,
         _txn: Arc<ValidatorTransaction>,
     ) -> Result<()> {
-        if let InnerState::Finished { pull_confirmed, .. } = &mut self.state {
-            if !*pull_confirmed {
-                // TODO(zjma): metric DKG_AGG_NODE_PROPOSED
-            }
-            *pull_confirmed = true;
+        match &mut self.state {
+            InnerState::Finished {
+                start_time,
+                proposed,
+                ..
+            } => {
+                if !*proposed {
+                    *proposed = true;
+                    let proposed_time = duration_since_epoch();
+                    let secs_since_dkg_start =
+                        proposed_time.as_secs_f64() - start_time.as_secs_f64();
+                    DKG_STAGE_SECONDS
+                        .with_label_values(&[self.my_addr.to_hex().as_str(), "proposed"])
+                        .observe(secs_since_dkg_start);
+                    info!(
+                        epoch = self.epoch_state.epoch,
+                        my_addr = self.my_addr,
+                        secs_since_dkg_start = secs_since_dkg_start,
+                        "[DKG] aggregated transcript proposed by consensus.",
+                    );
+                }
+                Ok(())
+            },
+            _ => {
+                bail!("[DKG] pull notification only expected in finished state");
+            },
         }
-        Ok(())
     }
 
     /// Calculate DKG config. Deal a transcript. Start broadcasting the transcript.
@@ -204,49 +288,69 @@ impl<DKG: DKGTrait> DKGManager<DKG> {
         start_time_us: u64,
         dkg_session_metadata: &DKGSessionMetadata,
     ) -> Result<()> {
-        self.state = match &self.state {
-            InnerState::NotStarted => {
-                let public_params = DKG::new_public_params(dkg_session_metadata);
-                let mut rng = if cfg!(feature = "smoke-test") {
-                    StdRng::from_seed(self.my_addr.into_bytes())
-                } else {
-                    StdRng::from_rng(thread_rng()).unwrap()
-                };
-                let input_secret = DKG::InputSecret::generate(&mut rng);
-
-                let trx = DKG::generate_transcript(
-                    &mut rng,
-                    &public_params,
-                    &input_secret,
-                    self.my_index as u64,
-                    &self.dealer_sk,
-                );
+        ensure!(
+            matches!(&self.state, InnerState::NotStarted),
+            "transcript already dealt"
+        );
+        let dkg_start_time = Duration::from_micros(start_time_us);
+        let deal_start = duration_since_epoch();
+        let secs_since_dkg_start = deal_start.as_secs_f64() - dkg_start_time.as_secs_f64();
+        DKG_STAGE_SECONDS
+            .with_label_values(&[self.my_addr.to_hex().as_str(), "deal_start"])
+            .observe(secs_since_dkg_start);
+        info!(
+            epoch = self.epoch_state.epoch,
+            my_addr = self.my_addr,
+            secs_since_dkg_start = secs_since_dkg_start,
+            "[DKG] Deal transcript started.",
+        );
+        let public_params = DKG::new_public_params(dkg_session_metadata);
+        let mut rng = if cfg!(feature = "smoke-test") {
+            StdRng::from_seed(self.my_addr.into_bytes())
+        } else {
+            StdRng::from_rng(thread_rng()).unwrap()
+        };
+        let input_secret = DKG::InputSecret::generate(&mut rng);
 
-                let dkg_transcript = DKGTranscript::new(
-                    self.epoch_state.epoch,
-                    self.my_addr,
-                    bcs::to_bytes(&trx).map_err(|e| {
-                        anyhow!("setup_deal_broadcast failed with trx serialization error: {e}")
-                    })?,
-                );
+        let trx = DKG::generate_transcript(
+            &mut rng,
+            &public_params,
+            &input_secret,
+            self.my_index as u64,
+            &self.dealer_sk,
+        );
 
-                // TODO(zjma): DKG_NODE_READY metric
+        let my_transcript = DKGTranscript::new(
+            self.epoch_state.epoch,
+            self.my_addr,
+            bcs::to_bytes(&trx).map_err(|e| anyhow!("transcript serialization error: {e}"))?,
+        );
 
-                let abort_handle = self.agg_trx_producer.start_produce(
-                    self.epoch_state.clone(),
-                    public_params.clone(),
-                    self.agg_trx_tx.clone(),
-                );
+        let deal_finish = duration_since_epoch();
+        let secs_since_dkg_start = deal_finish.as_secs_f64() - dkg_start_time.as_secs_f64();
+        DKG_STAGE_SECONDS
+            .with_label_values(&[self.my_addr.to_hex().as_str(), "deal_finish"])
+            .observe(secs_since_dkg_start);
+        info!(
+            epoch = self.epoch_state.epoch,
+            my_addr = self.my_addr,
+            secs_since_dkg_start = secs_since_dkg_start,
+            "[DKG] Deal transcript finished.",
+        );
 
-                // Switch to the next stage.
-                InnerState::InProgress {
-                    start_time_us,
-                    public_params,
-                    my_transcript: dkg_transcript,
-                    abort_handle,
-                }
-            },
-            _ => unreachable!(), // `setup_deal_broadcast` is called only when DKG state is `NotStarted`.
+        let abort_handle = self.agg_trx_producer.start_produce(
+            dkg_start_time,
+            self.my_addr,
+            self.epoch_state.clone(),
+            public_params.clone(),
+            self.agg_trx_tx.clone(),
+        );
+
+        // Switch to the next stage.
+        self.state = InnerState::InProgress {
+            start_time: dkg_start_time,
+            my_transcript,
+            abort_handle,
         };
 
         Ok(())
@@ -254,49 +358,75 @@ impl<DKG: DKGTrait> DKGManager<DKG> {
 
     /// On a locally aggregated transcript, put it into the validator txn pool and update inner states.
     async fn process_aggregated_transcript(&mut self, agg_trx: DKG::Transcript) -> Result<()> {
+        info!(
+            epoch = self.epoch_state.epoch,
+            my_addr = self.my_addr,
+            "[DKG] Processing locally aggregated transcript."
+        );
         self.state = match std::mem::take(&mut self.state) {
             InnerState::InProgress {
-                start_time_us,
-                my_transcript: my_node,
+                start_time,
+                my_transcript,
                 ..
             } => {
-                // TODO(zjma): metric DKG_AGG_NODE_READY
+                let agg_transcript_ready_time = duration_since_epoch();
+                let secs_since_dkg_start =
+                    agg_transcript_ready_time.as_secs_f64() - start_time.as_secs_f64();
+                DKG_STAGE_SECONDS
+                    .with_label_values(&[self.my_addr.to_hex().as_str(), "agg_transcript_ready"])
+                    .observe(secs_since_dkg_start);
+
                 let txn = ValidatorTransaction::DKGResult(DKGTranscript {
                     metadata: DKGTranscriptMetadata {
                         epoch: self.epoch_state.epoch,
                         author: self.my_addr,
                     },
-                    transcript_bytes: bcs::to_bytes(&agg_trx).map_err(|e|anyhow!("process_aggregated_transcript failed with trx serialization error: {e}"))?,
+                    transcript_bytes: bcs::to_bytes(&agg_trx)
+                        .map_err(|e| anyhow!("transcript serialization error: {e}"))?,
                 });
                 let vtxn_guard = self.vtxn_pool.put(
                     Topic::DKG,
                     Arc::new(txn),
                     Some(self.pull_notification_tx.clone()),
                 );
+                info!(
+                    epoch = self.epoch_state.epoch,
+                    my_addr = self.my_addr,
+                    "[DKG] aggregated transcript put into vtxn pool."
+                );
                 InnerState::Finished {
                     vtxn_guard,
-                    start_time_us,
-                    my_transcript: my_node,
-                    pull_confirmed: false,
+                    start_time,
+                    my_transcript,
+                    proposed: false,
                 }
             },
-            _ => bail!("process agg trx failed with invalid inner state"),
+            _ => bail!("[DKG] aggregated transcript only expected during DKG"),
         };
         Ok(())
     }
 
-    /// On a DKG start event, execute DKG.
-    async fn process_dkg_start_event(&mut self, maybe_event: Option<DKGStartEvent>) -> Result<()> {
-        if let Some(event) = maybe_event {
-            let DKGStartEvent {
-                session_metadata,
-                start_time_us,
-            } = event;
-            ensure!(self.epoch_state.epoch == session_metadata.dealer_epoch);
-            self.setup_deal_broadcast(start_time_us, &session_metadata)
-                .await?;
-        }
-        Ok(())
+    async fn process_dkg_start_event(&mut self, event: DKGStartEvent) -> Result<()> {
+        info!(
+            epoch = self.epoch_state.epoch,
+            my_addr = self.my_addr,
+            "[DKG] Processing DKGStart event."
+        );
+        fail_point!("dkg::process_dkg_start_event");
+        let DKGStartEvent {
+            session_metadata,
+            start_time_us,
+        } = event;
+        ensure!(
+            matches!(&self.state, InnerState::NotStarted),
+            "[DKG] dkg already started"
+        );
+        ensure!(
+            self.epoch_state.epoch == session_metadata.dealer_epoch,
+            "[DKG] event not for current epoch"
+        );
+        self.setup_deal_broadcast(start_time_us, &session_metadata)
+            .await
     }
 
     /// Process an RPC request from DKG peers.
@@ -306,24 +436,17 @@ impl<DKG: DKGTrait> DKGManager<DKG> {
             mut response_sender,
             ..
         } = req;
-        ensure!(msg.epoch() == self.epoch_state.epoch);
+        ensure!(
+            msg.epoch() == self.epoch_state.epoch,
+            "[DKG] msg not for current epoch"
+        );
         let response = match (&self.state, &msg) {
-            (
-                InnerState::Finished {
-                    my_transcript: my_node,
-                    ..
-                },
-                DKGMessage::NodeRequest(_),
-            )
-            | (
-                InnerState::InProgress {
-                    my_transcript: my_node,
-                    ..
-                },
-                DKGMessage::NodeRequest(_),
-            ) => Ok(DKGMessage::NodeResponse(my_node.clone())),
+            (InnerState::Finished { my_transcript, .. }, DKGMessage::TranscriptRequest(_))
+            | (InnerState::InProgress { my_transcript, .. }, DKGMessage::TranscriptRequest(_)) => {
+                Ok(DKGMessage::TranscriptResponse(my_transcript.clone()))
+            },
             _ => Err(anyhow!(
-                "msg {:?} unexpected in state {:?}",
+                "[DKG] msg {:?} unexpected in state {:?}",
                 msg.name(),
                 self.state.variant_name()
             )),
diff --git a/dkg/src/dkg_manager/tests.rs b/dkg/src/dkg_manager/tests.rs
index 8e48a6227df41..42ac02466185c 100644
--- a/dkg/src/dkg_manager/tests.rs
+++ b/dkg/src/dkg_manager/tests.rs
@@ -80,21 +80,26 @@ async fn test_dkg_state_transition() {
 
     // In state `NotStarted`, DKGManager should accept `DKGStartEvent`:
     // it should record start time, compute its own node, and enter state `InProgress`.
-    let handle_result = dkg_manager
-        .process_dkg_start_event(Some(DKGStartEvent {
-            session_metadata: DKGSessionMetadata {
-                dealer_epoch: 999,
-                dealer_validator_set: validator_consensus_info_move_structs.clone(),
-                target_validator_set: validator_consensus_info_move_structs.clone(),
-            },
-            start_time_us: 1700000000000000,
-        }))
-        .await;
+    let start_time_1 = Duration::from_secs(1700000000);
+    let event = DKGStartEvent {
+        session_metadata: DKGSessionMetadata {
+            dealer_epoch: 999,
+            dealer_validator_set: validator_consensus_info_move_structs.clone(),
+            target_validator_set: validator_consensus_info_move_structs.clone(),
+        },
+        start_time_us: start_time_1.as_micros() as u64,
+    };
+    let handle_result = dkg_manager.process_dkg_start_event(event.clone()).await;
     assert!(handle_result.is_ok());
     assert!(
-        matches!(&dkg_manager.state, InnerState::InProgress { start_time_us, my_transcript, .. } if *start_time_us == 1700000000000000 && my_transcript.metadata == DKGTranscriptMetadata{ epoch: 999, author: addrs[0]})
+        matches!(&dkg_manager.state, InnerState::InProgress { start_time, my_transcript, .. } if *start_time == start_time_1 && my_transcript.metadata == DKGTranscriptMetadata{ epoch: 999, author: addrs[0]})
     );
 
+    // 2nd `DKGStartEvent` should be rejected.
+    let handle_result = dkg_manager.process_dkg_start_event(event).await;
+    println!("{:?}", handle_result);
+    assert!(handle_result.is_err());
+
     // In state `InProgress`, DKGManager should respond to `DKGNodeRequest` with its own node.
     let rpc_node_request = new_rpc_node_request(999, addrs[3], rpc_response_collector.clone());
     let handle_result = dkg_manager.process_peer_rpc_msg(rpc_node_request).await;
@@ -104,7 +109,9 @@ async fn test_dkg_state_transition() {
         .map(anyhow::Result::unwrap)
         .collect::<Vec<_>>();
     assert_eq!(
-        vec![DKGMessage::NodeResponse(dkg_manager.state.my_node_cloned())],
+        vec![DKGMessage::TranscriptResponse(
+            dkg_manager.state.my_node_cloned()
+        )],
         last_responses
     );
     assert!(matches!(&dkg_manager.state, InnerState::InProgress { .. }));
@@ -143,7 +150,9 @@ async fn test_dkg_state_transition() {
         .map(anyhow::Result::unwrap)
         .collect::<Vec<_>>();
     assert_eq!(
-        vec![DKGMessage::NodeResponse(dkg_manager.state.my_node_cloned())],
+        vec![DKGMessage::TranscriptResponse(
+            dkg_manager.state.my_node_cloned()
+        )],
         last_responses
     );
     assert!(matches!(&dkg_manager.state, InnerState::Finished { .. }));
@@ -156,7 +165,7 @@ fn new_rpc_node_request(
     response_collector: Arc<RwLock<Vec<anyhow::Result<DKGMessage>>>>,
 ) -> IncomingRpcRequest {
     IncomingRpcRequest {
-        msg: DKGMessage::NodeRequest(DKGTranscriptRequest::new(epoch)),
+        msg: DKGMessage::TranscriptRequest(DKGTranscriptRequest::new(epoch)),
         sender,
         response_sender: Box::new(DummyRpcResponseSender::new(response_collector)),
     }
diff --git a/dkg/src/epoch_manager.rs b/dkg/src/epoch_manager.rs
index f5b210820ed2e..71634d7a8aae7 100644
--- a/dkg/src/epoch_manager.rs
+++ b/dkg/src/epoch_manager.rs
@@ -22,7 +22,8 @@ use aptos_types::{
     dkg::{DKGStartEvent, DKGState, DKGTrait, DefaultDKG},
     epoch_state::EpochState,
     on_chain_config::{
-        FeatureFlag, Features, OnChainConfigPayload, OnChainConfigProvider, ValidatorSet,
+        FeatureFlag, Features, OnChainConfigPayload, OnChainConfigProvider, OnChainConsensusConfig,
+        ValidatorSet,
     },
 };
 use aptos_validator_transaction_pool::VTxnPoolState;
@@ -45,7 +46,7 @@ pub struct EpochManager<P: OnChainConfigProvider> {
     dkg_rpc_msg_tx:
         Option<aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingRpcRequest)>>,
     dkg_manager_close_tx: Option<oneshot::Sender<oneshot::Sender<()>>>,
-    dkg_start_event_tx: Option<oneshot::Sender<DKGStartEvent>>,
+    dkg_start_event_tx: Option<aptos_channel::Sender<(), DKGStartEvent>>,
     vtxn_pool: VTxnPoolState,
 
     // Network utils
@@ -93,13 +94,13 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
     }
 
     fn on_dkg_start_notification(&mut self, notification: EventNotification) -> Result<()> {
-        if let Some(tx) = self.dkg_start_event_tx.take() {
+        if let Some(tx) = self.dkg_start_event_tx.as_ref() {
             let EventNotification {
                 subscribed_events, ..
             } = notification;
             for event in subscribed_events {
                 if let Ok(dkg_start_event) = DKGStartEvent::try_from(&event) {
-                    let _ = tx.send(dkg_start_event);
+                    let _ = tx.push((), dkg_start_event);
                     return Ok(());
                 } else {
                     debug!("[DKG] on_dkg_start_notification: failed in converting a contract event to a dkg start event!");
@@ -157,11 +158,16 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
             .copied();
 
         let features = payload.get::<Features>().unwrap_or_default();
+        let onchain_consensus_config: anyhow::Result<OnChainConsensusConfig> = payload.get();
+        if let Err(error) = &onchain_consensus_config {
+            error!("Failed to read on-chain consensus config {}", error);
+        }
+        let consensus_config = onchain_consensus_config.unwrap_or_default();
 
-        if let (true, Some(my_index)) = (
-            features.is_enabled(FeatureFlag::RECONFIGURE_WITH_DKG),
-            my_index,
-        ) {
+        // Check both validator txn and DKG features are enabled
+        let randomness_enabled = consensus_config.is_vtxn_enabled()
+            && features.is_enabled(FeatureFlag::RECONFIGURE_WITH_DKG);
+        if let (true, Some(my_index)) = (randomness_enabled, my_index) {
             let DKGState {
                 in_progress: in_progress_session,
                 ..
@@ -178,7 +184,8 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
             );
             let agg_trx_producer = AggTranscriptProducer::new(rb);
 
-            let (dkg_start_event_tx, dkg_start_event_rx) = oneshot::channel();
+            let (dkg_start_event_tx, dkg_start_event_rx) =
+                aptos_channel::new(QueueStyle::KLAST, 1, None);
             self.dkg_start_event_tx = Some(dkg_start_event_tx);
 
             let (dkg_rpc_msg_tx, dkg_rpc_msg_rx) = aptos_channel::new::<
diff --git a/dkg/src/lib.rs b/dkg/src/lib.rs
index 9e62b71c8e35c..4b694eb980ce0 100644
--- a/dkg/src/lib.rs
+++ b/dkg/src/lib.rs
@@ -1,6 +1,7 @@
 // Copyright © Aptos Foundation
 // SPDX-License-Identifier: Apache-2.0
 
+mod agg_trx_producer;
 mod counters;
 mod dkg_manager;
 pub mod epoch_manager;
@@ -49,5 +50,3 @@ pub fn start_dkg_runtime(
     runtime.spawn(dkg_epoch_manager.start(network_receiver));
     runtime
 }
-
-pub mod agg_trx_producer;
diff --git a/dkg/src/transcript_aggregation/mod.rs b/dkg/src/transcript_aggregation/mod.rs
index f0d896b2dfbf7..62d47817e17f4 100644
--- a/dkg/src/transcript_aggregation/mod.rs
+++ b/dkg/src/transcript_aggregation/mod.rs
@@ -1,16 +1,18 @@
 // Copyright © Aptos Foundation
 
-use crate::{types::DKGTranscriptRequest, DKGMessage};
-use anyhow::ensure;
+use crate::{counters::DKG_STAGE_SECONDS, types::DKGTranscriptRequest, DKGMessage};
+use anyhow::{anyhow, ensure};
 use aptos_consensus_types::common::Author;
-use aptos_infallible::Mutex;
+use aptos_infallible::{duration_since_epoch, Mutex};
+use aptos_logger::info;
 use aptos_reliable_broadcast::BroadcastStatus;
 use aptos_types::{
     dkg::{DKGTrait, DKGTranscript},
     epoch_state::EpochState,
+    validator_verifier::VerifyError,
 };
 use move_core_types::account_address::AccountAddress;
-use std::{collections::HashSet, sync::Arc};
+use std::{collections::HashSet, sync::Arc, time::Duration};
 
 pub struct TranscriptAggregator<S: DKGTrait> {
     pub contributors: HashSet<AccountAddress>,
@@ -27,15 +29,26 @@ impl<S: DKGTrait> Default for TranscriptAggregator<S> {
 }
 
 pub struct TranscriptAggregationState<DKG: DKGTrait> {
+    start_time: Duration,
+    my_addr: AccountAddress,
+    valid_peer_transcript_seen: bool,
     trx_aggregator: Mutex<TranscriptAggregator<DKG>>,
     dkg_pub_params: DKG::PublicParams,
     epoch_state: Arc<EpochState>,
 }
 
 impl<DKG: DKGTrait> TranscriptAggregationState<DKG> {
-    pub fn new(dkg_pub_params: DKG::PublicParams, epoch_state: Arc<EpochState>) -> Self {
+    pub fn new(
+        start_time: Duration,
+        my_addr: AccountAddress,
+        dkg_pub_params: DKG::PublicParams,
+        epoch_state: Arc<EpochState>,
+    ) -> Self {
         //TODO(zjma): take DKG threshold as a parameter.
         Self {
+            start_time,
+            my_addr,
+            valid_peer_transcript_seen: false,
             trx_aggregator: Mutex::new(TranscriptAggregator::default()),
             dkg_pub_params,
             epoch_state,
@@ -59,33 +72,79 @@ impl<S: DKGTrait> BroadcastStatus<DKGMessage> for Arc<TranscriptAggregationState
         } = dkg_transcript;
         ensure!(
             metadata.epoch == self.epoch_state.epoch,
-            "adding dkg node failed with invalid node epoch",
+            "[DKG] adding peer transcript failed with invalid node epoch",
+        );
+
+        let peer_power = self.epoch_state.verifier.get_voting_power(&sender);
+        ensure!(
+            peer_power.is_some(),
+            "[DKG] adding peer transcript failed with illegal dealer"
         );
         ensure!(
             metadata.author == sender,
-            "adding dkg node failed with node author mismatch"
+            "[DKG] adding peer transcript failed with node author mismatch"
         );
-        let transcript = bcs::from_bytes(transcript_bytes.as_slice())?;
+        let transcript = bcs::from_bytes(transcript_bytes.as_slice()).map_err(|e| {
+            anyhow!("[DKG] adding peer transcript failed with trx deserialization error: {e}")
+        })?;
         let mut trx_aggregator = self.trx_aggregator.lock();
         if trx_aggregator.contributors.contains(&metadata.author) {
             return Ok(None);
         }
 
-        S::verify_transcript(&self.dkg_pub_params, &transcript)?;
+        S::verify_transcript(&self.dkg_pub_params, &transcript).map_err(|e| {
+            anyhow!("[DKG] adding peer transcript failed with trx verification failure: {e}")
+        })?;
 
         // All checks passed. Aggregating.
+        let is_self = self.my_addr == sender;
+        if !is_self && !self.valid_peer_transcript_seen {
+            let secs_since_dkg_start =
+                duration_since_epoch().as_secs_f64() - self.start_time.as_secs_f64();
+            DKG_STAGE_SECONDS
+                .with_label_values(&[
+                    self.my_addr.to_hex().as_str(),
+                    "first_valid_peer_transcript",
+                ])
+                .observe(secs_since_dkg_start);
+        }
+
         trx_aggregator.contributors.insert(metadata.author);
         if let Some(agg_trx) = trx_aggregator.trx.as_mut() {
             S::aggregate_transcripts(&self.dkg_pub_params, agg_trx, transcript);
         } else {
             trx_aggregator.trx = Some(transcript);
         }
-        let maybe_aggregated = self
+        let threshold = self.epoch_state.verifier.quorum_voting_power();
+        let power_check_result = self
             .epoch_state
             .verifier
-            .check_voting_power(trx_aggregator.contributors.iter(), true)
+            .check_voting_power(trx_aggregator.contributors.iter(), true);
+        let new_total_power = match &power_check_result {
+            Ok(x) => Some(*x),
+            Err(VerifyError::TooLittleVotingPower { voting_power, .. }) => Some(*voting_power),
+            _ => None,
+        };
+        let maybe_aggregated = power_check_result
             .ok()
-            .map(|_aggregated_voting_power| trx_aggregator.trx.clone().unwrap());
+            .map(|_| trx_aggregator.trx.clone().unwrap());
+        info!(
+            epoch = self.epoch_state.epoch,
+            peer = sender,
+            is_self = is_self,
+            peer_power = peer_power,
+            new_total_power = new_total_power,
+            threshold = threshold,
+            threshold_exceeded = maybe_aggregated.is_some(),
+            "[DKG] added transcript from validator {}, {} out of {} aggregated.",
+            self.epoch_state
+                .verifier
+                .address_to_validator_index()
+                .get(&sender)
+                .unwrap(),
+            new_total_power.unwrap_or(0),
+            threshold
+        );
         Ok(maybe_aggregated)
     }
 }
diff --git a/dkg/src/transcript_aggregation/tests.rs b/dkg/src/transcript_aggregation/tests.rs
index eeb2e34dcfb50..96f6c0308ddfa 100644
--- a/dkg/src/transcript_aggregation/tests.rs
+++ b/dkg/src/transcript_aggregation/tests.rs
@@ -2,6 +2,7 @@
 
 use crate::transcript_aggregation::TranscriptAggregationState;
 use aptos_crypto::{bls12381::bls12381_keys, Uniform};
+use aptos_infallible::duration_since_epoch;
 use aptos_reliable_broadcast::BroadcastStatus;
 use aptos_types::{
     dkg::{
@@ -23,6 +24,7 @@ fn test_transcript_aggregation_state() {
     let addrs: Vec<AccountAddress> = (0..num_validators)
         .map(|_| AccountAddress::random())
         .collect();
+    let vfn_addr = AccountAddress::random();
     let private_keys: Vec<bls12381_keys::PrivateKey> = (0..num_validators)
         .map(|_| bls12381_keys::PrivateKey::generate_for_testing())
         .collect();
@@ -46,6 +48,8 @@ fn test_transcript_aggregation_state() {
     });
     let epoch_state = Arc::new(EpochState { epoch, verifier });
     let trx_agg_state = Arc::new(TranscriptAggregationState::<DummyDKG>::new(
+        duration_since_epoch(),
+        addrs[0],
         pub_params,
         epoch_state,
     ));
@@ -73,6 +77,16 @@ fn test_transcript_aggregation_state() {
     });
     assert!(result.is_err());
 
+    // Node authored by non-active-validator should be rejected.
+    let result = trx_agg_state.add(vfn_addr, DKGTranscript {
+        metadata: DKGTranscriptMetadata {
+            epoch: 999,
+            author: vfn_addr,
+        },
+        transcript_bytes: good_trx_bytes.clone(),
+    });
+    assert!(result.is_err());
+
     // Node with invalid transcript should be rejected.
     let mut bad_trx_bytes = good_trx_bytes.clone();
     bad_trx_bytes[0] = 0xAB;
diff --git a/dkg/src/types.rs b/dkg/src/types.rs
index 29172e48e05ad..928b659027278 100644
--- a/dkg/src/types.rs
+++ b/dkg/src/types.rs
@@ -24,22 +24,22 @@ impl DKGTranscriptRequest {
 /// The DKG network message.
 #[derive(Clone, Serialize, Deserialize, Debug, EnumConversion, PartialEq)]
 pub enum DKGMessage {
-    NodeRequest(DKGTranscriptRequest),
-    NodeResponse(DKGTranscript),
+    TranscriptRequest(DKGTranscriptRequest),
+    TranscriptResponse(DKGTranscript),
 }
 
 impl DKGMessage {
     pub fn epoch(&self) -> u64 {
         match self {
-            DKGMessage::NodeRequest(request) => request.dealer_epoch,
-            DKGMessage::NodeResponse(response) => response.metadata.epoch,
+            DKGMessage::TranscriptRequest(request) => request.dealer_epoch,
+            DKGMessage::TranscriptResponse(response) => response.metadata.epoch,
         }
     }
 
     pub fn name(&self) -> &str {
         match self {
-            DKGMessage::NodeRequest(_) => "DKGTranscriptRequest",
-            DKGMessage::NodeResponse(_) => "DKGTranscriptResponse",
+            DKGMessage::TranscriptRequest(_) => "DKGTranscriptRequest",
+            DKGMessage::TranscriptResponse(_) => "DKGTranscriptResponse",
         }
     }
 }