Skip to content

Commit

Permalink
Execute the block window.
Browse files Browse the repository at this point in the history
Doesn't have any optimizations.
  • Loading branch information
bchocho committed Oct 16, 2024
1 parent 7a3647b commit a0f2d39
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 74 deletions.
11 changes: 8 additions & 3 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,24 @@ use std::{

#[derive(Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct OrderedBlockWindow {
blocks: Vec<Block>,
blocks: Vec<Arc<PipelinedBlock>>,
}

impl OrderedBlockWindow {
pub fn new(blocks: Vec<Block>) -> Self {
pub fn new(blocks: Vec<Arc<PipelinedBlock>>) -> Self {
Self { blocks }
}

pub fn empty() -> Self {
Self { blocks: vec![] }
}

pub fn blocks(&self) -> &Vec<Block> {
// TODO: clone required?
pub fn blocks(&self) -> Vec<Block> {
self.blocks.iter().map(|b| b.block().clone()).collect()
}

pub fn pipelined_blocks(&self) -> &Vec<Arc<PipelinedBlock>> {
&self.blocks
}
}
Expand Down
58 changes: 54 additions & 4 deletions consensus/src/block_preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@

use crate::{
counters::{self, MAX_TXNS_FROM_BLOCK_TO_EXECUTE, TXN_SHUFFLE_SECONDS},
monitor,
payload_manager::TPayloadManager,
transaction_deduper::TransactionDeduper,
transaction_filter::TransactionFilter,
transaction_shuffler::TransactionShuffler,
};
use aptos_consensus_types::block::Block;
use aptos_consensus_types::{block::Block, pipelined_block::OrderedBlockWindow};
use aptos_executor_types::ExecutorResult;
use aptos_logger::info;
use aptos_types::transaction::SignedTransaction;
use fail::fail_point;
use futures::{stream::FuturesOrdered, StreamExt};
use std::{sync::Arc, time::Instant};

pub struct BlockPreparer {
Expand All @@ -36,16 +39,63 @@ impl BlockPreparer {
}
}

pub async fn prepare_block(&self, block: &Block) -> ExecutorResult<Vec<SignedTransaction>> {
async fn get_transactions(
&self,
block: &Block,
block_window: &OrderedBlockWindow,
) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
let mut txns = vec![];
let mut futures = FuturesOrdered::new();
for block in block_window
.pipelined_blocks()
.iter()
.map(|b| b.block())
.chain(std::iter::once(block))
{
futures.push_back(async move { self.payload_manager.get_transactions(block).await });
}
let mut max_txns_from_block_to_execute = None;
loop {
match futures.next().await {
// TODO: we are turning off the max txns from block to execute feature for now
Some(Ok((block_txns, _max_txns))) => {
txns.extend(block_txns);
max_txns_from_block_to_execute = None;
},
Some(Err(e)) => {
return Err(e);
},
None => break,
}
}
Ok((txns, max_txns_from_block_to_execute))
}

pub async fn prepare_block(
&self,
block: &Block,
block_window: &OrderedBlockWindow,
) -> ExecutorResult<Vec<SignedTransaction>> {
fail_point!("consensus::prepare_block", |_| {
use aptos_executor_types::ExecutorError;
use std::{thread, time::Duration};
thread::sleep(Duration::from_millis(10));
Err(ExecutorError::CouldNotGetData)
});
let start_time = Instant::now();
let (txns, max_txns_from_block_to_execute) =
self.payload_manager.get_transactions(block).await?;
info!(
"BlockPreparer: Preparing for block {} and window {:?}",
block.id(),
block_window
.blocks()
.iter()
.map(|b| b.id())
.collect::<Vec<_>>()
);

let (txns, max_txns_from_block_to_execute) = monitor!("get_transactions", {
self.get_transactions(block, block_window).await?
});
let txn_filter = self.txn_filter.clone();
let txn_deduper = self.txn_deduper.clone();
let txn_shuffler = self.txn_shuffler.clone();
Expand Down
11 changes: 9 additions & 2 deletions consensus/src/block_storage/block_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use std::{

/// This structure is a wrapper of [`ExecutedBlock`](aptos_consensus_types::pipelined_block::PipelinedBlock)
/// that adds `children` field to know the parent-child relationship between blocks.
// TODO: remove debug
#[derive(Debug)]
struct LinkableBlock {
/// Executed block that has raw block data and execution output.
executed_block: Arc<PipelinedBlock>,
Expand Down Expand Up @@ -294,7 +296,7 @@ impl BlockTree {
"Added block: {}, for window of block: {}",
current_block, block
);
window.push(current_block.clone());
window.push(parent_block);
} else {
info!(
"Visiting block: {} was not found, parent of block: {}, for window of block: {}",
Expand Down Expand Up @@ -426,20 +428,25 @@ impl BlockTree {

let mut blocks_pruned = VecDeque::new();
let mut blocks_to_be_pruned = vec![self.linkable_window_root()];
info!("blocks_to_be_pruned: {:?}", blocks_to_be_pruned);
while let Some(block_to_remove) = blocks_to_be_pruned.pop() {
info!("block_to_remove: {:?}", block_to_remove);
// Add the children to the blocks to be pruned (if any), but stop when it reaches the
// new root
for child_id in block_to_remove.children() {
info!(" child_id: {}", child_id);
if next_window_root_id == *child_id {
continue;
}
blocks_to_be_pruned.push(
self.get_linkable_block(child_id)
.expect("Child must exist in the tree"),
);
info!("blocks_to_be_pruned: {:?}", blocks_to_be_pruned);
}
// Track all the block ids removed
blocks_pruned.push_back(block_to_remove.id());
info!("blocks_pruned: {:?}", blocks_pruned);
}
blocks_pruned
}
Expand Down Expand Up @@ -587,7 +594,7 @@ impl BlockTree {
update_counters_for_committed_blocks(blocks_to_commit);
let current_round = self.commit_root().round();
let committed_round = block_to_commit.round();
debug!(
info!(
LogSchema::new(LogEvent::CommitViaBlock).round(current_round),
committed_round = committed_round,
block_id = block_to_commit.id(),
Expand Down
4 changes: 3 additions & 1 deletion consensus/src/dag/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ use aptos_bitvec::BitVec;
use aptos_consensus_types::{
block::Block,
common::{Author, Payload, Round},
pipelined_block::PipelinedBlock,
};
use aptos_executor_types::ExecutorResult;
use aptos_types::{aggregate_signature::AggregateSignature, transaction::SignedTransaction};
use async_trait::async_trait;
use std::sync::Arc;

pub(super) const TEST_DAG_WINDOW: u64 = 5;

Expand All @@ -25,7 +27,7 @@ pub(super) struct MockPayloadManager {}
impl TPayloadManager for MockPayloadManager {
fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {}

fn notify_commit(&self, _block_timestamp: u64, _payloads: Vec<Payload>) {}
fn notify_commit(&self, _block_timestamp: u64, _blocks: &[Arc<PipelinedBlock>]) {}

fn check_payload_availability(&self, _block: &Block) -> Result<(), BitVec> {
unimplemented!()
Expand Down
11 changes: 9 additions & 2 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use crate::{
pipeline::pipeline_phase::CountedRequest,
state_computer::StateComputeResultFut,
};
use aptos_consensus_types::{block::Block, pipeline_execution_result::PipelineExecutionResult};
use aptos_consensus_types::{
block::Block, pipeline_execution_result::PipelineExecutionResult,
pipelined_block::OrderedBlockWindow,
};
use aptos_crypto::HashValue;
use aptos_executor_types::{
state_checkpoint_output::StateCheckpointOutput, BlockExecutorTrait, ExecutorError,
Expand Down Expand Up @@ -90,6 +93,7 @@ impl ExecutionPipeline {
pub async fn queue(
&self,
block: Block,
block_window: OrderedBlockWindow,
metadata: BlockMetadataExt,
parent_block_id: HashValue,
txn_generator: BlockPreparer,
Expand All @@ -102,6 +106,7 @@ impl ExecutionPipeline {
self.prepare_block_tx
.send(PrepareBlockCommand {
block,
block_window,
metadata,
block_executor_onchain_config,
parent_block_id,
Expand Down Expand Up @@ -131,6 +136,7 @@ impl ExecutionPipeline {
) {
let PrepareBlockCommand {
block,
block_window,
metadata,
block_executor_onchain_config,
parent_block_id,
Expand All @@ -142,7 +148,7 @@ impl ExecutionPipeline {
} = command;
counters::PREPARE_BLOCK_WAIT_TIME.observe_duration(command_creation_time.elapsed());
debug!("prepare_block received block {}.", block.id());
let input_txns = block_preparer.prepare_block(&block).await;
let input_txns = block_preparer.prepare_block(&block, &block_window).await;
if let Err(e) = input_txns {
result_tx
.send(Err(e))
Expand Down Expand Up @@ -370,6 +376,7 @@ impl ExecutionPipeline {

struct PrepareBlockCommand {
block: Block,
block_window: OrderedBlockWindow,
metadata: BlockMetadataExt,
block_executor_onchain_config: BlockExecutorConfigFromOnchain,
// The parent block id.
Expand Down
Loading

0 comments on commit a0f2d39

Please sign in to comment.