Skip to content

Commit

Permalink
usize -> u64
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos committed Jun 26, 2024
1 parent 620d4a5 commit 5bc70e3
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 31 deletions.
4 changes: 2 additions & 2 deletions config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub struct PipelineBackpressureValues {
// If we want to dynamically increase it beyond quorum_store_poll_time,
// we need to adjust timeouts other nodes use for the backpressured round.
pub backpressure_proposal_delay_ms: u64,
pub max_txns_from_block_to_execute: Option<usize>,
pub max_txns_from_block_to_execute: Option<u64>,
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
Expand All @@ -157,7 +157,7 @@ pub struct ChainHealthBackoffValues {
pub max_sending_block_bytes_override: u64,

pub backoff_proposal_delay_ms: u64,
pub max_txns_from_block_to_execute: Option<usize>,
pub max_txns_from_block_to_execute: Option<u64>,
}

impl Default for ConsensusConfig {
Expand Down
10 changes: 5 additions & 5 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl ProofWithData {
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct ProofWithDataWithTxnLimit {
pub proof_with_data: ProofWithData,
pub max_txns_to_execute: Option<usize>,
pub max_txns_to_execute: Option<u64>,
}

impl PartialEq for ProofWithDataWithTxnLimit {
Expand All @@ -181,7 +181,7 @@ impl PartialEq for ProofWithDataWithTxnLimit {
impl Eq for ProofWithDataWithTxnLimit {}

impl ProofWithDataWithTxnLimit {
pub fn new(proof_with_data: ProofWithData, max_txns_to_execute: Option<usize>) -> Self {
pub fn new(proof_with_data: ProofWithData, max_txns_to_execute: Option<u64>) -> Self {
Self {
proof_with_data,
max_txns_to_execute,
Expand All @@ -197,7 +197,7 @@ impl ProofWithDataWithTxnLimit {
}
}

fn sum_max_txns_to_execute(m1: Option<usize>, m2: Option<usize>) -> Option<usize> {
fn sum_max_txns_to_execute(m1: Option<u64>, m2: Option<u64>) -> Option<u64> {
match (m1, m2) {
(None, _) => m2,
(_, None) => m1,
Expand All @@ -214,12 +214,12 @@ pub enum Payload {
QuorumStoreInlineHybrid(
Vec<(BatchInfo, Vec<SignedTransaction>)>,
ProofWithData,
Option<usize>,
Option<u64>,
),
}

impl Payload {
pub fn transform_to_quorum_store_v2(self, max_txns_to_execute: Option<usize>) -> Self {
pub fn transform_to_quorum_store_v2(self, max_txns_to_execute: Option<u64>) -> Self {
match self {
Payload::InQuorumStore(proof_with_status) => Payload::InQuorumStoreWithLimit(
ProofWithDataWithTxnLimit::new(proof_with_status, max_txns_to_execute),
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/block_preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl BlockPreparer {
};

if let Some(max_txns_from_block_to_execute) = max_txns_from_block_to_execute {
shuffled_txns.truncate(max_txns_from_block_to_execute);
shuffled_txns.truncate(max_txns_from_block_to_execute as usize);
}
MAX_TXNS_FROM_BLOCK_TO_EXECUTE.observe(shuffled_txns.len() as f64);
Ok(shuffled_txns)
Expand Down
6 changes: 5 additions & 1 deletion consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,11 @@ impl BlockReader for BlockStore {
match cur_block {
Some(block) => {
if let Some(execution_time_and_size) = block.get_execution_time_and_size() {
info!("Found execution time for {}, {:?}", block.id(), execution_time_and_size );
info!(
"Found execution time for {}, {:?}",
block.id(),
execution_time_and_size
);
res.push(execution_time_and_size);
if res.len() >= num_blocks {
return res;
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/consensus_observer/network_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl ConsensusObserverMessage {
pub fn new_block_payload_message(
block: BlockInfo,
transactions: Vec<SignedTransaction>,
limit: Option<usize>,
limit: Option<u64>,
) -> ConsensusObserverDirectSend {
ConsensusObserverDirectSend::BlockPayload(BlockPayload {
block,
Expand Down Expand Up @@ -175,5 +175,5 @@ pub struct OrderedBlock {
pub struct BlockPayload {
pub block: BlockInfo,
pub transactions: Vec<SignedTransaction>,
pub limit: Option<usize>,
pub limit: Option<u64>,
}
4 changes: 2 additions & 2 deletions consensus/src/consensus_observer/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ use tokio_stream::wrappers::IntervalStream;
#[derive(Debug, Clone)]
pub struct BlockTransactionPayload {
pub transactions: Vec<SignedTransaction>,
pub limit: Option<usize>,
pub limit: Option<u64>,
}

impl BlockTransactionPayload {
pub fn new(transactions: Vec<SignedTransaction>, limit: Option<usize>) -> Self {
pub fn new(transactions: Vec<SignedTransaction>, limit: Option<u64>) -> Self {
Self {
transactions,
limit,
Expand Down
20 changes: 20 additions & 0 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,26 @@ pub static PROPOSER_DELAY_PROPOSAL: Lazy<Gauge> = Lazy::new(|| {
.unwrap()
});

/// Histogram for max number of transactions proposer uses when creating block.
pub static PROPOSER_MAX_BLOCK_TXNS: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_proposer_max_block_txns",
"Histogram for max number of transactions proposer uses when creating block.",
NUM_CONSENSUS_TRANSACTIONS_BUCKETS.to_vec()
)
.unwrap()
});

/// Histogram for max number of transactions to execute proposer uses when creating block.
pub static PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"aptos_proposer_max_block_txns_to_execute",
"Histogram for max number of transactions to execute proposer uses when creating block.",
NUM_CONSENSUS_TRANSACTIONS_BUCKETS.to_vec()
)
.unwrap()
});

/// How many pending blocks are there, when we make a proposal
pub static PROPOSER_PENDING_BLOCKS_COUNT: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
Expand Down
58 changes: 42 additions & 16 deletions consensus/src/liveness/proposal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ use super::{
use crate::{
block_storage::BlockReader,
counters::{
CHAIN_HEALTH_BACKOFF_TRIGGERED, EXECUTION_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, PROPOSER_DELAY_PROPOSAL, PROPOSER_PENDING_BLOCKS_COUNT, PROPOSER_PENDING_BLOCKS_FILL_FRACTION
CHAIN_HEALTH_BACKOFF_TRIGGERED, EXECUTION_BACKPRESSURE_ON_PROPOSAL_TRIGGERED,
PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED, PROPOSER_DELAY_PROPOSAL,
PROPOSER_MAX_BLOCK_TXNS, PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE, PROPOSER_PENDING_BLOCKS_COUNT,
PROPOSER_PENDING_BLOCKS_FILL_FRACTION,
},
payload_client::PayloadClient,
util::time_service::TimeService,
};
use anyhow::{bail, ensure, format_err, Context};
use aptos_config::config::{
ChainHealthBackoffValues, PipelineBackpressureValues, ExecutionBackpressureConfig,
ChainHealthBackoffValues, ExecutionBackpressureConfig, PipelineBackpressureValues,
};
use aptos_consensus_types::{
block::Block,
Expand Down Expand Up @@ -152,7 +155,10 @@ impl PipelineBackpressureConfig {
&self,
block_execution_times: &[(u64, Duration)],
) -> Option<u64> {
info!("Estimated block execution times: {:?}", block_execution_times);
info!(
"Estimated block execution times: {:?}",
block_execution_times
);

self.execution.as_ref().and_then(|config| {
let sizes = block_execution_times
Expand Down Expand Up @@ -360,6 +366,10 @@ impl ProposalGenerator {
self.calculate_max_block_sizes(voting_power_ratio, timestamp, round)
.await;

PROPOSER_MAX_BLOCK_TXNS.observe(max_block_txns as f64);
PROPOSER_MAX_BLOCK_TXNS_TO_EXECUTE
.observe(max_txns_from_block_to_execute.unwrap_or(max_block_txns) as f64);

PROPOSER_DELAY_PROPOSAL.set(proposal_delay.as_secs_f64());
if !proposal_delay.is_zero() {
tokio::time::sleep(proposal_delay).await;
Expand Down Expand Up @@ -410,7 +420,7 @@ impl ProposalGenerator {

if !payload.is_direct()
&& max_txns_from_block_to_execute.is_some()
&& payload.len() > max_txns_from_block_to_execute.unwrap()
&& payload.len() as u64 > max_txns_from_block_to_execute.unwrap()
{
payload = payload.transform_to_quorum_store_v2(max_txns_from_block_to_execute);
}
Expand Down Expand Up @@ -454,7 +464,7 @@ impl ProposalGenerator {
voting_power_ratio: f64,
timestamp: Duration,
round: Round,
) -> (u64, u64, Option<usize>, Duration) {
) -> (u64, u64, Option<u64>, Duration) {
let mut values_max_block_txns = vec![self.max_block_txns];
let mut values_max_block_bytes = vec![self.max_block_bytes];
let mut values_proposal_delay = vec![Duration::ZERO];
Expand All @@ -476,7 +486,9 @@ impl ProposalGenerator {
}

let pipeline_pending_latency = self.block_store.pipeline_pending_latency(timestamp);
let pipeline_backpressure = self.pipeline_backpressure_config.get_backoff(pipeline_pending_latency);
let pipeline_backpressure = self
.pipeline_backpressure_config
.get_backoff(pipeline_pending_latency);
if let Some(value) = pipeline_backpressure {
values_max_block_txns.push(value.max_sending_block_txns_override);
values_max_block_bytes.push(value.max_sending_block_bytes_override);
Expand All @@ -489,37 +501,51 @@ impl ProposalGenerator {
PIPELINE_BACKPRESSURE_ON_PROPOSAL_TRIGGERED.observe(0.0);
};

let mut execution_backpressure_applied = 0.0;
let mut execution_backpressure_applied = false;
if let Some(config) = &self.pipeline_backpressure_config.execution {
if pipeline_pending_latency.as_millis() > config.back_pressure_pipeline_latency_limit_ms as u128 {
if pipeline_pending_latency.as_millis()
> config.back_pressure_pipeline_latency_limit_ms as u128
{
let execution_backpressure = self
.pipeline_backpressure_config
.get_execution_block_size_backoff(
&self
.block_store
.get_recent_block_execution_times(config.num_blocks_to_look_at),
);
if let Some(execution_backpressure_block_size) =
execution_backpressure
{
values_max_block_txns.push((execution_backpressure_block_size as f64 * config.reordering_ovarpacking_factor.max(1.0)) as u64);
if let Some(execution_backpressure_block_size) = execution_backpressure {
values_max_block_txns.push(
(execution_backpressure_block_size as f64
* config.reordering_ovarpacking_factor.max(1.0))
as u64,
);
values_max_txns_from_block_to_execute.push(execution_backpressure_block_size);
execution_backpressure_applied = 1.0;
execution_backpressure_applied = true;
}
}
}
EXECUTION_BACKPRESSURE_ON_PROPOSAL_TRIGGERED.observe(execution_backpressure_applied);
EXECUTION_BACKPRESSURE_ON_PROPOSAL_TRIGGERED.observe(
if execution_backpressure_applied {
1.0
} else {
0.0
},
);

let max_block_txns = values_max_block_txns.into_iter().min().unwrap();
let max_block_bytes = values_max_block_bytes.into_iter().min().unwrap();
let proposal_delay = values_proposal_delay.into_iter().max().unwrap();
let max_txns_from_block_to_execute =
values_max_txns_from_block_to_execute.into_iter().min();
if pipeline_backpressure.is_some() || chain_health_backoff.is_some() || execution_backpressure_applied {
if pipeline_backpressure.is_some()
|| chain_health_backoff.is_some()
|| execution_backpressure_applied
{
warn!(
proposal_delay_ms = proposal_delay.as_millis(),
max_block_txns = max_block_txns,
max_txns_from_block_to_execute = max_txns_from_block_to_execute.unwrap_or(max_block_txns),
max_txns_from_block_to_execute =
max_txns_from_block_to_execute.unwrap_or(max_block_txns),
max_block_bytes = max_block_bytes,
is_pipeline_backpressure = pipeline_backpressure.is_some(),
is_execution_backpressure = execution_backpressure_applied,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl PayloadManager {
pub async fn get_transactions(
&self,
block: &Block,
) -> ExecutorResult<(Vec<SignedTransaction>, Option<usize>)> {
) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
let payload = match block.payload() {
Some(p) => p,
None => return Ok((Vec::new(), None)),
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/quorum_store/tests/proof_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn get_proposal(
fn assert_payload_response(
payload: Payload,
expected: &[ProofOfStore],
max_txns_from_block_to_execute: Option<usize>,
max_txns_from_block_to_execute: Option<u64>,
) {
match payload {
Payload::InQuorumStore(proofs) => {
Expand Down

0 comments on commit 5bc70e3

Please sign in to comment.