diff --git a/Cargo.lock b/Cargo.lock index 392e67e5ab5b9..562ca57978877 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1177,6 +1177,7 @@ name = "aptos-executor-service" version = "0.1.0" dependencies = [ "anyhow", + "aptos-block-partitioner", "aptos-config", "aptos-crypto", "aptos-executor-types", @@ -1185,11 +1186,16 @@ dependencies = [ "aptos-retrier", "aptos-secure-net", "aptos-state-view", + "aptos-storage-interface", "aptos-types", "aptos-vm", "bcs 0.1.4", "clap 4.3.5", + "crossbeam-channel", "itertools", + "num_cpus", + "rand 0.7.3", + "rayon", "serde 1.0.149", "serde_json", "thiserror", @@ -2955,6 +2961,9 @@ dependencies = [ "aptos-config", "aptos-logger", "aptos-metrics-core", + "aptos-retrier", + "bcs 0.1.4", + "crossbeam-channel", "once_cell", "serde 1.0.149", "thiserror", @@ -3543,6 +3552,7 @@ dependencies = [ "aptos-vm-logging", "aptos-vm-types", "bcs 0.1.4", + "crossbeam-channel", "dashmap", "fail 0.5.0", "futures", diff --git a/aptos-move/aptos-transaction-benchmarks/src/transactions.rs b/aptos-move/aptos-transaction-benchmarks/src/transactions.rs index 1dca4d75ac684..09366c1f8caaf 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transactions.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transactions.rs @@ -6,7 +6,6 @@ use aptos_bitvec::BitVec; use aptos_block_executor::txn_commit_hook::NoOpTransactionCommitHook; use aptos_block_partitioner::sharded_block_partitioner::ShardedBlockPartitioner; use aptos_crypto::HashValue; -use aptos_executor_service::remote_executor_client::RemoteExecutorClient; use aptos_language_e2e_tests::{ account_universe::{AUTransactionGen, AccountPickStyle, AccountUniverse, AccountUniverseGen}, data_store::FakeDataStore, @@ -22,7 +21,10 @@ use aptos_types::{ use aptos_vm::{ block_executor::{AptosTransactionOutput, BlockAptosVM}, data_cache::AsMoveResolver, - sharded_block_executor::{block_executor_client::VMExecutorClient, ShardedBlockExecutor}, + sharded_block_executor::{ + local_executor_shard::{LocalExecutorClient, LocalExecutorService}, + ShardedBlockExecutor, + }, }; use criterion::{measurement::Measurement, BatchSize, Bencher}; use once_cell::sync::Lazy; @@ -192,7 +194,8 @@ struct TransactionBenchState { num_transactions: usize, strategy: S, account_universe: AccountUniverse, - parallel_block_executor: Option>>, + parallel_block_executor: + Option>>>, block_partitioner: Option, validator_set: ValidatorSet, state_view: Arc, @@ -228,7 +231,8 @@ where universe_strategy: impl Strategy, num_transactions: usize, num_executor_shards: usize, - remote_executor_addresses: Option>, + // TODO(skedia): add support for remote executor addresses. + _remote_executor_addresses: Option>, ) -> Self { let mut runner = TestRunner::default(); let universe_gen = universe_strategy @@ -246,18 +250,9 @@ where let (parallel_block_executor, block_partitioner) = if num_executor_shards == 1 { (None, None) } else { - let parallel_block_executor = - if let Some(remote_executor_addresses) = remote_executor_addresses { - let remote_executor_clients = remote_executor_addresses - .into_iter() - .map(|addr| RemoteExecutorClient::new(addr, 10000)) - .collect::>(); - Arc::new(ShardedBlockExecutor::new(remote_executor_clients)) - } else { - let local_executor_client = - VMExecutorClient::create_vm_clients(num_executor_shards, None); - Arc::new(ShardedBlockExecutor::new(local_executor_client)) - }; + let client = + LocalExecutorService::setup_local_executor_shards(num_executor_shards, None); + let parallel_block_executor = Arc::new(ShardedBlockExecutor::new(client)); ( Some(parallel_block_executor), Some(ShardedBlockPartitioner::new(num_executor_shards)), diff --git a/aptos-move/aptos-vm/Cargo.toml b/aptos-move/aptos-vm/Cargo.toml index 60a3ae0b2b48c..5c3c8f6eaeeca 100644 --- a/aptos-move/aptos-vm/Cargo.toml +++ b/aptos-move/aptos-vm/Cargo.toml @@ -33,6 +33,7 @@ aptos-utils = { workspace = true } aptos-vm-logging = { workspace = true } aptos-vm-types = { workspace = true } bcs = { workspace = true } +crossbeam-channel = { workspace = true } dashmap = { workspace = true } fail = { workspace = true } futures = { workspace = true } diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index 270d2c40cdd75..d3a292429222d 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -12,7 +12,7 @@ use crate::{ data_cache::StorageAdapter, errors::expect_only_successful_execution, move_vm_ext::{MoveResolverExt, RespawnedSession, SessionExt, SessionId}, - sharded_block_executor::ShardedBlockExecutor, + sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, system_module_names::*, transaction_metadata::TransactionMetadata, verifier, VMExecutor, VMValidator, @@ -1527,8 +1527,8 @@ impl VMExecutor for AptosVM { ret } - fn execute_block_sharded( - sharded_block_executor: &ShardedBlockExecutor, + fn execute_block_sharded>( + sharded_block_executor: &ShardedBlockExecutor, transactions: Vec>, state_view: Arc, maybe_block_gas_limit: Option, diff --git a/aptos-move/aptos-vm/src/lib.rs b/aptos-move/aptos-vm/src/lib.rs index 3377c4d572526..ce3a742905701 100644 --- a/aptos-move/aptos-vm/src/lib.rs +++ b/aptos-move/aptos-vm/src/lib.rs @@ -123,7 +123,7 @@ mod transaction_validation; mod verifier; pub use crate::aptos_vm::AptosVM; -use crate::sharded_block_executor::ShardedBlockExecutor; +use crate::sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}; use aptos_state_view::StateView; use aptos_types::{ block_executor::partitioner::SubBlocksForShard, @@ -161,8 +161,8 @@ pub trait VMExecutor: Send + Sync { ) -> Result, VMStatus>; /// Executes a block of transactions using a sharded block executor and returns the results. - fn execute_block_sharded( - sharded_block_executor: &ShardedBlockExecutor, + fn execute_block_sharded>( + sharded_block_executor: &ShardedBlockExecutor, block: Vec>, state_view: Arc, maybe_block_gas_limit: Option, diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/block_executor_client.rs b/aptos-move/aptos-vm/src/sharded_block_executor/block_executor_client.rs deleted file mode 100644 index f3333cdc7ede2..0000000000000 --- a/aptos-move/aptos-vm/src/sharded_block_executor/block_executor_client.rs +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright © Aptos Foundation - -use crate::block_executor::{AptosTransactionOutput, BlockAptosVM}; -use aptos_block_executor::txn_commit_hook::NoOpTransactionCommitHook; -use aptos_state_view::StateView; -use aptos_types::{ - block_executor::partitioner::SubBlocksForShard, - transaction::{analyzed_transaction::AnalyzedTransaction, TransactionOutput}, -}; -use move_core_types::vm_status::VMStatus; -use std::sync::Arc; - -pub trait BlockExecutorClient { - fn execute_block( - &self, - transactions: SubBlocksForShard, - state_view: &S, - concurrency_level: usize, - maybe_block_gas_limit: Option, - ) -> Result>, VMStatus>; -} - -impl BlockExecutorClient for VMExecutorClient { - fn execute_block( - &self, - sub_blocks: SubBlocksForShard, - state_view: &S, - concurrency_level: usize, - maybe_block_gas_limit: Option, - ) -> Result>, VMStatus> { - let txns = sub_blocks - .into_txns() - .into_iter() - .map(|txn| txn.into_txn()) - .collect(); - Ok(vec![BlockAptosVM::execute_block::< - _, - NoOpTransactionCommitHook, - >( - self.executor_thread_pool.clone(), - txns, - state_view, - concurrency_level, - maybe_block_gas_limit, - None, - )?]) - } -} - -pub struct VMExecutorClient { - executor_thread_pool: Arc, -} - -impl VMExecutorClient { - pub fn new(num_threads: usize) -> Self { - let executor_thread_pool = Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(num_threads) - .build() - .unwrap(), - ); - - Self { - executor_thread_pool, - } - } - - pub fn create_vm_clients(num_shards: usize, num_threads: Option) -> Vec { - let num_threads = num_threads - .unwrap_or_else(|| (num_cpus::get() as f64 / num_shards as f64).ceil() as usize); - (0..num_shards) - .map(|_| VMExecutorClient::new(num_threads)) - .collect() - } -} diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/coordinator_client.rs b/aptos-move/aptos-vm/src/sharded_block_executor/coordinator_client.rs new file mode 100644 index 0000000000000..50403e13928a2 --- /dev/null +++ b/aptos-move/aptos-vm/src/sharded_block_executor/coordinator_client.rs @@ -0,0 +1,14 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::sharded_block_executor::ExecutorShardCommand; +use aptos_state_view::StateView; +use aptos_types::transaction::TransactionOutput; +use move_core_types::vm_status::VMStatus; + +// Interface to communicate from the executor shards to the block executor coordinator. +pub trait CoordinatorClient: Send + Sync { + fn receive_execute_command(&self) -> ExecutorShardCommand; + + fn send_execution_result(&self, result: Result>, VMStatus>); +} diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/cross_shard_client.rs b/aptos-move/aptos-vm/src/sharded_block_executor/cross_shard_client.rs index b3b56068f9733..f97a032da83ad 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/cross_shard_client.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/cross_shard_client.rs @@ -19,10 +19,7 @@ use aptos_types::{ }; use std::{ collections::{HashMap, HashSet}, - sync::{ - mpsc::{Receiver, Sender}, - Arc, Mutex, - }, + sync::Arc, }; pub struct CrossShardCommitReceiver {} @@ -30,17 +27,19 @@ pub struct CrossShardCommitReceiver {} impl CrossShardCommitReceiver { pub fn start( cross_shard_state_view: Arc>, - message_rx: &Receiver, + cross_shard_client: Arc, + round: RoundId, ) { loop { - let msg = message_rx.recv().unwrap(); + let msg = cross_shard_client.receive_cross_shard_msg(round); match msg { - CrossShardMsg::RemoteTxnWriteMsg(txn_commit_msg) => { + RemoteTxnWriteMsg(txn_commit_msg) => { let (state_key, write_op) = txn_commit_msg.take(); cross_shard_state_view .set_value(&state_key, write_op.and_then(|w| w.as_state_value())); }, CrossShardMsg::StopMsg => { + trace!("Cross shard commit receiver stopped for round {}", round); break; }, } @@ -50,8 +49,7 @@ impl CrossShardCommitReceiver { pub struct CrossShardCommitSender { shard_id: ShardId, - // The senders of cross-shard messages to other shards per round. - message_txs: Arc>>>>, + cross_shard_client: Arc, // The hashmap of source txn index to hashmap of conflicting storage location to the // list shard id and round id. Please note that the transaction indices stored here is // global indices, so we need to convert the local index received from the parallel execution to @@ -65,7 +63,7 @@ pub struct CrossShardCommitSender { impl CrossShardCommitSender { pub fn new( shard_id: ShardId, - message_txs: Arc>>>>, + cross_shard_client: Arc, sub_block: &SubBlock, ) -> Self { let mut dependent_edges = HashMap::new(); @@ -98,7 +96,7 @@ impl CrossShardCommitSender { Self { shard_id, - message_txs, + cross_shard_client, dependent_edges, index_offset: sub_block.start_index as TxnIndex, } @@ -121,11 +119,11 @@ impl CrossShardCommitSender { state_key.clone(), Some(write_op.clone()), )); - self.message_txs[*dependent_shard_id][*round_id] - .lock() - .unwrap() - .send(message) - .unwrap(); + self.cross_shard_client.send_cross_shard_msg( + *dependent_shard_id, + *round_id, + message, + ); } } } @@ -146,3 +144,11 @@ impl TransactionCommitHook for CrossShardCommitSender { todo!("on_transaction_aborted not supported for sharded execution yet") } } + +// CrossShardClient is a trait that defines the interface for sending and receiving messages across +// shards. +pub trait CrossShardClient: Send + Sync { + fn send_cross_shard_msg(&self, shard_id: ShardId, round: RoundId, msg: CrossShardMsg); + + fn receive_cross_shard_msg(&self, current_round: RoundId) -> CrossShardMsg; +} diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/executor_client.rs b/aptos-move/aptos-vm/src/sharded_block_executor/executor_client.rs new file mode 100644 index 0000000000000..09697e7c4e5ca --- /dev/null +++ b/aptos-move/aptos-vm/src/sharded_block_executor/executor_client.rs @@ -0,0 +1,28 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_state_view::StateView; +use aptos_types::{ + block_executor::partitioner::SubBlocksForShard, + transaction::{analyzed_transaction::AnalyzedTransaction, TransactionOutput}, +}; +use move_core_types::vm_status::VMStatus; +use std::sync::Arc; + +// Interface to communicate from the block executor coordinator to the executor shards. +pub trait ExecutorClient: Send + Sync { + fn num_shards(&self) -> usize; + + // A non blocking call that sends the block to be executed by the executor shards. + fn execute_block( + &self, + state_view: Arc, + block: Vec>, + concurrency_level_per_shard: usize, + maybe_block_gas_limit: Option, + ); + + // Blocking call that waits for the execution results from the executor shards. It returns the execution results + // from each shard and in the sub-block order. + fn get_execution_result(&self) -> Result>>, VMStatus>; +} diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/executor_shard.rs b/aptos-move/aptos-vm/src/sharded_block_executor/executor_shard.rs deleted file mode 100644 index 49c41a74fa688..0000000000000 --- a/aptos-move/aptos-vm/src/sharded_block_executor/executor_shard.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright © Aptos Foundation -// Parts of the project are originally copyright © Meta Platforms, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use crate::sharded_block_executor::{ - block_executor_client::BlockExecutorClient, ExecutorShardCommand, -}; -use aptos_logger::trace; -use aptos_state_view::StateView; -use aptos_types::transaction::TransactionOutput; -use move_core_types::vm_status::VMStatus; -use std::sync::mpsc::{Receiver, Sender}; - -/// A remote block executor that receives transactions from a channel and executes them in parallel. -/// Currently it runs in the local machine and it will be further extended to run in a remote machine. -pub struct ExecutorShard { - num_shards: usize, - shard_id: usize, - executor_client: E, - command_rx: Receiver>, - result_tx: Sender>, VMStatus>>, -} - -impl ExecutorShard { - pub fn new( - num_shards: usize, - executor_client: E, - shard_id: usize, - command_rx: Receiver>, - result_tx: Sender>, VMStatus>>, - ) -> Self { - Self { - num_shards, - shard_id, - executor_client, - command_rx, - result_tx, - } - } - - pub fn start(&self) { - trace!( - "Shard starting, shard_id={}, num_shards={}.", - self.shard_id, - self.num_shards - ); - loop { - let command = self.command_rx.recv().unwrap(); - match command { - ExecutorShardCommand::ExecuteSubBlocks( - state_view, - transactions, - concurrency_level_per_shard, - maybe_block_gas_limit, - ) => { - trace!( - "Shard {} received ExecuteBlock command of block size {} ", - self.shard_id, - transactions.num_txns() - ); - let ret = self.executor_client.execute_block( - transactions, - state_view.as_ref(), - concurrency_level_per_shard, - maybe_block_gas_limit, - ); - drop(state_view); - self.result_tx.send(ret).unwrap(); - }, - ExecutorShardCommand::Stop => { - break; - }, - } - } - trace!("Shard {} is shutting down", self.shard_id); - } -} diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/local_executor_shard.rs b/aptos-move/aptos-vm/src/sharded_block_executor/local_executor_shard.rs new file mode 100644 index 0000000000000..5ae345322f837 --- /dev/null +++ b/aptos-move/aptos-vm/src/sharded_block_executor/local_executor_shard.rs @@ -0,0 +1,231 @@ +// Copyright © Aptos Foundation + +use crate::sharded_block_executor::{ + coordinator_client::CoordinatorClient, cross_shard_client::CrossShardClient, + executor_client::ExecutorClient, messages::CrossShardMsg, + sharded_executor_service::ShardedExecutorService, ExecutorShardCommand, +}; +use aptos_block_partitioner::sharded_block_partitioner::MAX_ALLOWED_PARTITIONING_ROUNDS; +use aptos_logger::trace; +use aptos_state_view::StateView; +use aptos_types::{ + block_executor::partitioner::{RoundId, ShardId, SubBlocksForShard}, + transaction::{analyzed_transaction::AnalyzedTransaction, TransactionOutput}, +}; +use crossbeam_channel::{unbounded, Receiver, Sender}; +use move_core_types::vm_status::VMStatus; +use std::{sync::Arc, thread}; + +/// Executor service that runs on local machine and waits for commands from the coordinator and executes +/// them in parallel. +pub struct LocalExecutorService { + join_handle: Option>, + phantom: std::marker::PhantomData, +} + +impl LocalExecutorService { + fn new( + shard_id: ShardId, + num_shards: usize, + num_threads: usize, + command_rx: Receiver>, + result_tx: Sender>, VMStatus>>, + cross_shard_client: LocalCrossShardClient, + ) -> Self { + let coordinator_client = Arc::new(LocalCoordinatorClient::new(command_rx, result_tx)); + let executor_service = Arc::new(ShardedExecutorService::new( + shard_id, + num_shards, + num_threads, + coordinator_client, + Arc::new(cross_shard_client), + )); + let join_handle = thread::Builder::new() + .name(format!("executor-shard-{}", shard_id)) + .spawn(move || executor_service.start()) + .unwrap(); + Self { + join_handle: Some(join_handle), + phantom: std::marker::PhantomData, + } + } + + pub fn setup_local_executor_shards( + num_shards: usize, + num_threads: Option, + ) -> LocalExecutorClient { + let num_threads = num_threads + .unwrap_or_else(|| (num_cpus::get() as f64 / num_shards as f64).ceil() as usize); + let (command_txs, command_rxs): ( + Vec>>, + Vec>>, + ) = (0..num_shards).map(|_| unbounded()).unzip(); + let (result_txs, result_rxs): ( + Vec>, VMStatus>>>, + Vec>, VMStatus>>>, + ) = (0..num_shards).map(|_| unbounded()).unzip(); + // We need to create channels for each shard and each round. This is needed because individual + // shards might send cross shard messages to other shards that will be consumed in different rounds. + // Having a single channel per shard will cause a shard to receiver messages that is not intended in the current round. + let (cross_shard_msg_txs, cross_shard_msg_rxs): ( + Vec>>, + Vec>>, + ) = (0..num_shards) + .map(|_| { + (0..MAX_ALLOWED_PARTITIONING_ROUNDS) + .map(|_| unbounded()) + .unzip() + }) + .unzip(); + let executor_shards = command_rxs + .into_iter() + .zip(result_txs.into_iter()) + .zip(cross_shard_msg_rxs.into_iter()) + .enumerate() + .map(|(shard_id, ((command_rx, result_tx), cross_shard_rxs))| { + let cross_shard_client = + LocalCrossShardClient::new(cross_shard_msg_txs.clone(), cross_shard_rxs); + Self::new( + shard_id as ShardId, + num_shards, + num_threads, + command_rx, + result_tx, + cross_shard_client, + ) + }) + .collect(); + LocalExecutorClient::new(command_txs, result_rxs, executor_shards) + } +} + +pub struct LocalExecutorClient { + // Channels to send execute block commands to the executor shards. + command_txs: Vec>>, + // Channels to receive execution results from the executor shards. + result_rxs: Vec>, VMStatus>>>, + + executor_services: Vec>, +} + +impl LocalExecutorClient { + pub fn new( + command_tx: Vec>>, + result_rx: Vec>, VMStatus>>>, + executor_shards: Vec>, + ) -> Self { + Self { + command_txs: command_tx, + result_rxs: result_rx, + executor_services: executor_shards, + } + } +} + +impl ExecutorClient for LocalExecutorClient { + fn num_shards(&self) -> usize { + self.command_txs.len() + } + + fn execute_block( + &self, + state_view: Arc, + block: Vec>, + concurrency_level_per_shard: usize, + maybe_block_gas_limit: Option, + ) { + assert_eq!(block.len(), self.num_shards()); + for (i, sub_blocks_for_shard) in block.into_iter().enumerate() { + self.command_txs[i] + .send(ExecutorShardCommand::ExecuteSubBlocks( + state_view.clone(), + sub_blocks_for_shard, + concurrency_level_per_shard, + maybe_block_gas_limit, + )) + .unwrap(); + } + } + + fn get_execution_result(&self) -> Result>>, VMStatus> { + trace!("LocalExecutorClient Waiting for results"); + let mut results = vec![]; + for (i, rx) in self.result_rxs.iter().enumerate() { + results.push( + rx.recv() + .unwrap_or_else(|_| panic!("Did not receive output from shard {}", i))?, + ); + } + Ok(results) + } +} + +impl Drop for LocalExecutorClient { + fn drop(&mut self) { + for command_tx in self.command_txs.iter() { + let _ = command_tx.send(ExecutorShardCommand::Stop); + } + + // wait for join handles to finish + for executor_service in self.executor_services.iter_mut() { + let _ = executor_service.join_handle.take().unwrap().join(); + } + } +} + +pub struct LocalCoordinatorClient { + command_rx: Receiver>, + // Channel to send execution results to the coordinator. + result_tx: Sender>, VMStatus>>, +} + +impl LocalCoordinatorClient { + pub fn new( + command_rx: Receiver>, + result_tx: Sender>, VMStatus>>, + ) -> Self { + Self { + command_rx, + result_tx, + } + } +} + +impl CoordinatorClient for LocalCoordinatorClient { + fn receive_execute_command(&self) -> ExecutorShardCommand { + self.command_rx.recv().unwrap() + } + + fn send_execution_result(&self, result: Result>, VMStatus>) { + self.result_tx.send(result).unwrap() + } +} + +pub struct LocalCrossShardClient { + // The senders of cross-shard messages to other shards per round. + message_txs: Vec>>, + // The receivers of cross shard messages from other shards per round. + message_rxs: Vec>, +} + +impl LocalCrossShardClient { + pub fn new( + cross_shard_txs: Vec>>, + cross_shard_rxs: Vec>, + ) -> Self { + Self { + message_txs: cross_shard_txs, + message_rxs: cross_shard_rxs, + } + } +} + +impl CrossShardClient for LocalCrossShardClient { + fn send_cross_shard_msg(&self, shard_id: ShardId, round: RoundId, msg: CrossShardMsg) { + self.message_txs[shard_id][round].send(msg).unwrap() + } + + fn receive_cross_shard_msg(&self, current_round: RoundId) -> CrossShardMsg { + self.message_rxs[current_round].recv().unwrap() + } +} diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/messages.rs b/aptos-move/aptos-vm/src/sharded_block_executor/messages.rs index ae5b635deeeb6..4dd711474c5dc 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/messages.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/messages.rs @@ -1,14 +1,15 @@ // Copyright © Aptos Foundation use aptos_types::{state_store::state_key::StateKey, write_set::WriteOp}; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub enum CrossShardMsg { RemoteTxnWriteMsg(RemoteTxnWrite), StopMsg, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct RemoteTxnWrite { state_key: StateKey, // The write op is None if the transaction is aborted. diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/mod.rs b/aptos-move/aptos-vm/src/sharded_block_executor/mod.rs index 46b40710cff96..be41bcf57bfd4 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/mod.rs @@ -2,40 +2,34 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::sharded_block_executor::{counters::NUM_EXECUTOR_SHARDS, executor_shard::ExecutorShard}; -use aptos_logger::{error, info, trace}; +use crate::sharded_block_executor::{ + counters::NUM_EXECUTOR_SHARDS, executor_client::ExecutorClient, +}; +use aptos_logger::{info, trace}; use aptos_state_view::StateView; use aptos_types::{ block_executor::partitioner::SubBlocksForShard, transaction::{analyzed_transaction::AnalyzedTransaction, TransactionOutput}, }; -use block_executor_client::BlockExecutorClient; use move_core_types::vm_status::VMStatus; -use std::{ - marker::PhantomData, - sync::{ - mpsc::{Receiver, Sender}, - Arc, - }, - thread, -}; +use std::{marker::PhantomData, sync::Arc}; -pub mod block_executor_client; +pub mod coordinator_client; mod counters; -mod cross_shard_client; +pub mod cross_shard_client; mod cross_shard_state_view; -mod executor_shard; -mod messages; -pub mod sharded_executor_client; +pub mod executor_client; +pub mod local_executor_shard; +pub mod messages; +pub mod sharded_executor_service; +#[cfg(test)] +mod test_utils; #[cfg(test)] mod tests; -/// A wrapper around sharded block executors that manages multiple shards and aggregates the results. -pub struct ShardedBlockExecutor { - num_executor_shards: usize, - command_txs: Vec>>, - shard_threads: Vec>, - result_rxs: Vec>, VMStatus>>>, +/// Coordinator for sharded block executors that manages multiple shards and aggregates the results. +pub struct ShardedBlockExecutor> { + executor_client: C, phantom: PhantomData, } @@ -49,38 +43,22 @@ pub enum ExecutorShardCommand { Stop, } -impl ShardedBlockExecutor { - pub fn new(executor_clients: Vec) -> Self { - let mut command_txs = vec![]; - let mut result_rxs = vec![]; - let mut shard_join_handles = vec![]; - let num_executor_shards = executor_clients.len(); - for (i, executor_client) in executor_clients.into_iter().enumerate() { - let (transactions_tx, transactions_rx) = std::sync::mpsc::channel(); - let (result_tx, result_rx) = std::sync::mpsc::channel(); - command_txs.push(transactions_tx); - result_rxs.push(result_rx); - shard_join_handles.push(spawn_executor_shard( - num_executor_shards, - executor_client, - i, - transactions_rx, - result_tx, - )); - } +impl> ShardedBlockExecutor { + pub fn new(executor_client: C) -> Self { info!( "Creating a new ShardedBlockExecutor with {} shards", - num_executor_shards + executor_client.num_shards() ); Self { - num_executor_shards, - command_txs, - shard_threads: shard_join_handles, - result_rxs, + executor_client, phantom: PhantomData, } } + pub fn num_shards(&self) -> usize { + self.executor_client.num_shards() + } + /// Execute a block of transactions in parallel by splitting the block into num_remote_executors partitions and /// dispatching each partition to a remote executor shard. pub fn execute_block( @@ -90,37 +68,29 @@ impl ShardedBlockExecutor { concurrency_level_per_shard: usize, maybe_block_gas_limit: Option, ) -> Result, VMStatus> { - NUM_EXECUTOR_SHARDS.set(self.num_executor_shards as i64); + let num_executor_shards = self.executor_client.num_shards(); + NUM_EXECUTOR_SHARDS.set(num_executor_shards as i64); assert_eq!( - self.num_executor_shards, + num_executor_shards, block.len(), "Block must be partitioned into {} sub-blocks", - self.num_executor_shards + num_executor_shards + ); + self.executor_client.execute_block( + state_view, + block, + concurrency_level_per_shard, + maybe_block_gas_limit, ); - for (i, sub_blocks_for_shard) in block.into_iter().enumerate() { - self.command_txs[i] - .send(ExecutorShardCommand::ExecuteSubBlocks( - state_view.clone(), - sub_blocks_for_shard, - concurrency_level_per_shard, - maybe_block_gas_limit, - )) - .unwrap(); - } // wait for all remote executors to send the result back and append them in order by shard id - let mut results = vec![]; - trace!("ShardedBlockExecutor Waiting for results"); - for i in 0..self.num_executor_shards { - let result = self.result_rxs[i].recv().unwrap(); - results.push(result?); - } + let results = self.executor_client.get_execution_result()?; trace!("ShardedBlockExecutor Received all results"); let num_rounds = results[0].len(); let mut aggreate_results = vec![]; - let mut ordered_results = vec![vec![]; self.num_executor_shards * num_rounds]; + let mut ordered_results = vec![vec![]; num_executor_shards * num_rounds]; for (shard_id, results_from_shard) in results.into_iter().enumerate() { for (round, result) in results_from_shard.into_iter().enumerate() { - ordered_results[round * self.num_executor_shards + shard_id] = result; + ordered_results[round * num_executor_shards + shard_id] = result; } } @@ -131,48 +101,3 @@ impl ShardedBlockExecutor { Ok(aggreate_results) } } - -impl Drop for ShardedBlockExecutor { - /// Best effort stops all the executor shards and waits for the thread to finish. - fn drop(&mut self) { - // send stop command to all executor shards - for command_tx in self.command_txs.iter() { - if let Err(e) = command_tx.send(ExecutorShardCommand::Stop) { - error!("Failed to send stop command to executor shard: {:?}", e); - } - } - - // wait for all executor shards to stop - for shard_thread in self.shard_threads.drain(..) { - shard_thread.join().unwrap_or_else(|e| { - error!("Failed to join executor shard thread: {:?}", e); - }); - } - } -} - -fn spawn_executor_shard< - S: StateView + Sync + Send + 'static, - E: BlockExecutorClient + Sync + Send + 'static, ->( - num_executor_shards: usize, - executor_client: E, - shard_id: usize, - command_rx: Receiver>, - result_tx: Sender>, VMStatus>>, -) -> thread::JoinHandle<()> { - // create and start a new executor shard in a separate thread - thread::Builder::new() - .name(format!("executor-shard-{}", shard_id)) - .spawn(move || { - let executor_shard = ExecutorShard::new( - num_executor_shards, - executor_client, - shard_id, - command_rx, - result_tx, - ); - executor_shard.start(); - }) - .unwrap() -} diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_client.rs b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs similarity index 52% rename from aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_client.rs rename to aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs index dc387c780215a..16776e9c9f68e 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_client.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/sharded_executor_service.rs @@ -3,14 +3,14 @@ use crate::{ block_executor::BlockAptosVM, sharded_block_executor::{ - block_executor_client::BlockExecutorClient, + coordinator_client::CoordinatorClient, counters::SHARDED_BLOCK_EXECUTION_SECONDS, - cross_shard_client::{CrossShardCommitReceiver, CrossShardCommitSender}, + cross_shard_client::{CrossShardClient, CrossShardCommitReceiver, CrossShardCommitSender}, cross_shard_state_view::CrossShardStateView, messages::CrossShardMsg, + ExecutorShardCommand, }, }; -use aptos_block_partitioner::sharded_block_partitioner::MAX_ALLOWED_PARTITIONING_ROUNDS; use aptos_logger::{info, trace}; use aptos_state_view::StateView; use aptos_types::{ @@ -19,29 +19,23 @@ use aptos_types::{ }; use futures::{channel::oneshot, executor::block_on}; use move_core_types::vm_status::VMStatus; -use std::{ - collections::HashSet, - sync::{ - mpsc::{Receiver, Sender}, - Arc, Mutex, - }, -}; +use std::{collections::HashSet, sync::Arc}; -pub struct ShardedExecutorClient { +pub struct ShardedExecutorService { shard_id: ShardId, + num_shards: usize, executor_thread_pool: Arc, - // The receivers of cross shard messages from other shards per round. - message_rxs: Arc>>>, - // The senders of cross-shard messages to other shards per round. - message_txs: Arc>>>>, + coordinator_client: Arc>, + cross_shard_client: Arc, } -impl ShardedExecutorClient { +impl ShardedExecutorService { pub fn new( shard_id: ShardId, + num_shards: usize, num_threads: usize, - message_rxs: Vec>, - message_txs: Vec>>, + coordinator_client: Arc>, + cross_shard_client: Arc, ) -> Self { let executor_thread_pool = Arc::new( rayon::ThreadPoolBuilder::new() @@ -53,51 +47,14 @@ impl ShardedExecutorClient { ); Self { shard_id, + num_shards, executor_thread_pool, - message_rxs: Arc::new(message_rxs.into_iter().map(Mutex::new).collect()), - message_txs: Arc::new( - message_txs - .into_iter() - .map(|inner_vec| inner_vec.into_iter().map(Mutex::new).collect()) - .collect(), - ), - } - } - - pub fn create_sharded_executor_clients( - num_shards: usize, - num_threads: Option, - ) -> Vec { - let num_threads = num_threads - .unwrap_or_else(|| (num_cpus::get() as f64 / num_shards as f64).ceil() as usize); - let mut cross_shard_msg_txs = vec![]; - let mut cross_shard_msg_rxs = vec![]; - for _ in 0..num_shards { - let mut current_shard_msg_txs = vec![]; - let mut current_shard_msg_rxs = vec![]; - for _ in 0..MAX_ALLOWED_PARTITIONING_ROUNDS { - let (messages_tx, messages_rx) = std::sync::mpsc::channel(); - current_shard_msg_txs.push(messages_tx); - current_shard_msg_rxs.push(messages_rx); - } - cross_shard_msg_txs.push(current_shard_msg_txs); - cross_shard_msg_rxs.push(current_shard_msg_rxs); + coordinator_client, + cross_shard_client, } - cross_shard_msg_rxs - .into_iter() - .enumerate() - .map(|(shard_id, rxs)| { - Self::new( - shard_id as ShardId, - num_threads, - rxs, - cross_shard_msg_txs.clone(), - ) - }) - .collect() } - fn create_cross_shard_state_view<'a, S: StateView + Sync + Send>( + fn create_cross_shard_state_view<'a>( &self, base_view: &'a S, sub_block: &SubBlock, @@ -113,7 +70,7 @@ impl ShardedExecutorClient { CrossShardStateView::new(self.shard_id, cross_shard_state_key, base_view) } - fn execute_sub_block( + fn execute_sub_block( &self, sub_block: SubBlock, round: usize, @@ -127,31 +84,22 @@ impl ShardedExecutorClient { round ); let cross_shard_commit_sender = - CrossShardCommitSender::new(self.shard_id, self.message_txs.clone(), &sub_block); + CrossShardCommitSender::new(self.shard_id, self.cross_shard_client.clone(), &sub_block); let (callback, callback_receiver) = oneshot::channel(); - let message_rxs = self.message_rxs.clone(); - let self_message_tx = Arc::new(Mutex::new( - self.message_txs[self.shard_id][round] - .lock() - .unwrap() - .clone(), - )); let cross_shard_state_view = Arc::new(self.create_cross_shard_state_view(state_view, &sub_block)); - let cross_shard_state_view_clone1 = cross_shard_state_view.clone(); + let cross_shard_state_view_clone = cross_shard_state_view.clone(); + let cross_shard_client = self.cross_shard_client.clone(); + let cross_shard_client_clone = cross_shard_client.clone(); self.executor_thread_pool.scope(|s| { s.spawn(move |_| { - if round != 0 { - // If this is not the first round, start the cross-shard commit receiver. - // this is a bit ugly, we can get rid of this when we have round number - // information in the cross shard dependencies. - CrossShardCommitReceiver::start( - cross_shard_state_view_clone1, - &message_rxs[round].lock().unwrap(), - ); - } + CrossShardCommitReceiver::start( + cross_shard_state_view_clone, + cross_shard_client, + round, + ); }); s.spawn(move |_| { let ret = BlockAptosVM::execute_block( @@ -166,29 +114,24 @@ impl ShardedExecutorClient { maybe_block_gas_limit, Some(cross_shard_commit_sender), ); - // Send a stop command to the cross-shard commit receiver. - if round != 0 { - self_message_tx - .lock() - .unwrap() - .send(CrossShardMsg::StopMsg) - .unwrap(); - } + trace!( + "executed sub block for shard {} and round {}", + self.shard_id, + round + ); + // Send a self message to stop the cross-shard commit receiver. + cross_shard_client_clone.send_cross_shard_msg( + self.shard_id, + round, + CrossShardMsg::StopMsg, + ); callback.send(ret).unwrap(); }); }); - let ret = block_on(callback_receiver).unwrap(); - trace!( - "finished executing sub block for shard {} and round {}", - self.shard_id, - round - ); - ret + block_on(callback_receiver).unwrap() } -} -impl BlockExecutorClient for ShardedExecutorClient { - fn execute_block( + fn execute_block( &self, transactions: SubBlocksForShard, state_view: &S, @@ -213,7 +156,49 @@ impl BlockExecutorClient for ShardedExecutorClient { concurrency_level, maybe_block_gas_limit, )?); + trace!( + "Finished executing sub block for shard {} and round {}", + self.shard_id, + round + ); } Ok(result) } + + pub fn start(&self) { + trace!( + "Shard starting, shard_id={}, num_shards={}.", + self.shard_id, + self.num_shards + ); + loop { + let command = self.coordinator_client.receive_execute_command(); + match command { + ExecutorShardCommand::ExecuteSubBlocks( + state_view, + transactions, + concurrency_level_per_shard, + maybe_block_gas_limit, + ) => { + trace!( + "Shard {} received ExecuteBlock command of block size {} ", + self.shard_id, + transactions.num_txns() + ); + let ret = self.execute_block( + transactions, + state_view.as_ref(), + concurrency_level_per_shard, + maybe_block_gas_limit, + ); + drop(state_view); + self.coordinator_client.send_execution_result(ret); + }, + ExecutorShardCommand::Stop => { + break; + }, + } + } + trace!("Shard {} is shutting down", self.shard_id); + } } diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/test_utils.rs b/aptos-move/aptos-vm/src/sharded_block_executor/test_utils.rs new file mode 100644 index 0000000000000..90c30dcfc8b95 --- /dev/null +++ b/aptos-move/aptos-vm/src/sharded_block_executor/test_utils.rs @@ -0,0 +1,232 @@ +// Copyright © Aptos Foundation + +use crate::{ + sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, + AptosVM, VMExecutor, +}; +use aptos_block_partitioner::sharded_block_partitioner::ShardedBlockPartitioner; +use aptos_crypto::hash::CryptoHash; +use aptos_language_e2e_tests::{ + account::AccountData, common_transactions::peer_to_peer_txn, data_store::FakeDataStore, + executor::FakeExecutor, +}; +use aptos_types::{ + block_executor::partitioner::SubBlocksForShard, + state_store::state_key::StateKeyInner, + transaction::{analyzed_transaction::AnalyzedTransaction, Transaction, TransactionOutput}, +}; +use move_core_types::account_address::AccountAddress; +use rand::{rngs::OsRng, Rng}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +pub fn generate_account_at(executor: &mut FakeExecutor, address: AccountAddress) -> AccountData { + executor.new_account_data_at(address) +} + +fn generate_non_conflicting_sender_receiver( + executor: &mut FakeExecutor, +) -> (AccountData, AccountData) { + let sender = executor.create_raw_account_data(3_000_000_000, 0); + let receiver = executor.create_raw_account_data(3_000_000_000, 0); + executor.add_account_data(&sender); + executor.add_account_data(&receiver); + (sender, receiver) +} + +pub fn generate_non_conflicting_p2p( + executor: &mut FakeExecutor, +) -> (AnalyzedTransaction, AccountData, AccountData) { + let (mut sender, receiver) = generate_non_conflicting_sender_receiver(executor); + let transfer_amount = 1_000; + let txn = generate_p2p_txn(&mut sender, &receiver, transfer_amount); + // execute transaction + (txn, sender, receiver) +} + +pub fn generate_p2p_txn( + sender: &mut AccountData, + receiver: &AccountData, + transfer_amount: u64, +) -> AnalyzedTransaction { + let txn = Transaction::UserTransaction(peer_to_peer_txn( + sender.account(), + receiver.account(), + sender.sequence_number(), + transfer_amount, + 100, + )) + .into(); + sender.increment_sequence_number(); + txn +} + +pub fn compare_txn_outputs( + unsharded_txn_output: Vec, + sharded_txn_output: Vec, +) { + assert_eq!(unsharded_txn_output.len(), sharded_txn_output.len()); + for i in 0..unsharded_txn_output.len() { + assert_eq!( + unsharded_txn_output[i].status(), + sharded_txn_output[i].status() + ); + assert_eq!( + unsharded_txn_output[i].gas_used(), + sharded_txn_output[i].gas_used() + ); + //assert_eq!(unsharded_txn_output[i].write_set(), sharded_txn_output[i].write_set()); + assert_eq!( + unsharded_txn_output[i].events(), + sharded_txn_output[i].events() + ); + // Global supply tracking for coin is not supported in sharded execution yet, so we filter + // out the table item from the write set, which has the global supply. This is a hack until + // we support global supply tracking in sharded execution. + let unsharded_write_set_without_table_item = unsharded_txn_output[i] + .write_set() + .into_iter() + .filter(|(k, _)| matches!(k.inner(), &StateKeyInner::AccessPath(_))) + .collect::>(); + let sharded_write_set_without_table_item = sharded_txn_output[i] + .write_set() + .into_iter() + .filter(|(k, _)| matches!(k.inner(), &StateKeyInner::AccessPath(_))) + .collect::>(); + assert_eq!( + unsharded_write_set_without_table_item, + sharded_write_set_without_table_item + ); + } +} + +pub fn test_sharded_block_executor_no_conflict>( + sharded_block_executor: ShardedBlockExecutor, +) { + let num_txns = 400; + let num_shards = 8; + let mut executor = FakeExecutor::from_head_genesis(); + let mut transactions = Vec::new(); + for _ in 0..num_txns { + transactions.push(generate_non_conflicting_p2p(&mut executor).0) + } + let partitioner = ShardedBlockPartitioner::new(num_shards); + let partitioned_txns = partitioner.partition(transactions.clone(), 2, 0.9); + let sharded_txn_output = sharded_block_executor + .execute_block( + Arc::new(executor.data_store().clone()), + partitioned_txns, + 2, + None, + ) + .unwrap(); + let unsharded_txn_output = AptosVM::execute_block( + transactions.into_iter().map(|t| t.into_txn()).collect(), + &executor.data_store(), + None, + ) + .unwrap(); + compare_txn_outputs(unsharded_txn_output, sharded_txn_output); +} + +pub fn sharded_block_executor_with_conflict>( + sharded_block_executor: ShardedBlockExecutor, + concurrency: usize, +) { + let num_txns = 800; + let num_shards = sharded_block_executor.num_shards(); + let num_accounts = 80; + let mut executor = FakeExecutor::from_head_genesis(); + let mut transactions = Vec::new(); + let mut accounts = Vec::new(); + let mut txn_hash_to_account = HashMap::new(); + for _ in 0..num_accounts { + let account = generate_account_at(&mut executor, AccountAddress::random()); + accounts.push(Mutex::new(account)); + } + for i in 1..num_txns / num_accounts { + for j in 0..num_accounts { + let sender = &mut accounts[j].lock().unwrap(); + let sender_addr = *sender.address(); + let receiver = &accounts[(j + i) % num_accounts].lock().unwrap(); + let transfer_amount = 1_000; + let txn = generate_p2p_txn(sender, receiver, transfer_amount); + txn_hash_to_account.insert(txn.transaction().hash(), sender_addr); + transactions.push(txn) + } + } + + let partitioner = ShardedBlockPartitioner::new(num_shards); + let partitioned_txns = partitioner.partition(transactions.clone(), 8, 0.9); + + let execution_ordered_txns = SubBlocksForShard::flatten(partitioned_txns.clone()) + .into_iter() + .map(|t| t.into_txn()) + .collect(); + let sharded_txn_output = sharded_block_executor + .execute_block( + Arc::new(executor.data_store().clone()), + partitioned_txns, + concurrency, + None, + ) + .unwrap(); + + let unsharded_txn_output = + AptosVM::execute_block(execution_ordered_txns, &executor.data_store(), None).unwrap(); + compare_txn_outputs(unsharded_txn_output, sharded_txn_output); +} + +pub fn sharded_block_executor_with_random_transfers>( + sharded_block_executor: ShardedBlockExecutor, + concurrency: usize, +) { + let mut rng = OsRng; + let max_accounts = 200; + let max_txns = 1000; + let num_accounts = rng.gen_range(1, max_accounts); + let mut accounts = Vec::new(); + let mut executor = FakeExecutor::from_head_genesis(); + + for _ in 0..num_accounts { + let account = generate_account_at(&mut executor, AccountAddress::random()); + accounts.push(Mutex::new(account)); + } + + let num_txns = rng.gen_range(1, max_txns); + let num_shards = sharded_block_executor.num_shards(); + + let mut transactions = Vec::new(); + + for _ in 0..num_txns { + let indices = rand::seq::index::sample(&mut rng, num_accounts, 2); + let sender = &mut accounts[indices.index(0)].lock().unwrap(); + let receiver = &accounts[indices.index(1)].lock().unwrap(); + let transfer_amount = rng.gen_range(1, 1000); + let txn = generate_p2p_txn(sender, receiver, transfer_amount); + transactions.push(txn) + } + + let partitioner = ShardedBlockPartitioner::new(num_shards); + let partitioned_txns = partitioner.partition(transactions.clone(), 8, 0.9); + + let execution_ordered_txns = SubBlocksForShard::flatten(partitioned_txns.clone()) + .into_iter() + .map(|t| t.into_txn()) + .collect(); + + let sharded_txn_output = sharded_block_executor + .execute_block( + Arc::new(executor.data_store().clone()), + partitioned_txns, + concurrency, + None, + ) + .unwrap(); + + let unsharded_txn_output = + AptosVM::execute_block(execution_ordered_txns, &executor.data_store(), None).unwrap(); + compare_txn_outputs(unsharded_txn_output, sharded_txn_output); +} diff --git a/aptos-move/aptos-vm/src/sharded_block_executor/tests.rs b/aptos-move/aptos-vm/src/sharded_block_executor/tests.rs index 49606f9d9ef70..526ced74504a7 100644 --- a/aptos-move/aptos-vm/src/sharded_block_executor/tests.rs +++ b/aptos-move/aptos-vm/src/sharded_block_executor/tests.rs @@ -1,257 +1,53 @@ // Copyright © Aptos Foundation use crate::{ - sharded_block_executor::sharded_executor_client::ShardedExecutorClient, AptosVM, - ShardedBlockExecutor, VMExecutor, + sharded_block_executor::{local_executor_shard::LocalExecutorService, test_utils}, + ShardedBlockExecutor, }; -use aptos_block_partitioner::sharded_block_partitioner::ShardedBlockPartitioner; -use aptos_crypto::hash::CryptoHash; -use aptos_language_e2e_tests::{ - account::AccountData, common_transactions::peer_to_peer_txn, executor::FakeExecutor, -}; -use aptos_types::{ - block_executor::partitioner::SubBlocksForShard, - state_store::state_key::StateKeyInner, - transaction::{analyzed_transaction::AnalyzedTransaction, Transaction, TransactionOutput}, -}; -use move_core_types::account_address::AccountAddress; use rand::{rngs::OsRng, Rng}; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; - -fn generate_account_at(executor: &mut FakeExecutor, address: AccountAddress) -> AccountData { - executor.new_account_data_at(address) -} - -fn generate_non_conflicting_sender_receiver( - executor: &mut FakeExecutor, -) -> (AccountData, AccountData) { - let sender = executor.create_raw_account_data(3_000_000_000, 0); - let receiver = executor.create_raw_account_data(3_000_000_000, 0); - executor.add_account_data(&sender); - executor.add_account_data(&receiver); - (sender, receiver) -} - -fn generate_non_conflicting_p2p( - executor: &mut FakeExecutor, -) -> (AnalyzedTransaction, AccountData, AccountData) { - let (mut sender, receiver) = generate_non_conflicting_sender_receiver(executor); - let transfer_amount = 1_000; - let txn = generate_p2p_txn(&mut sender, &receiver, transfer_amount); - // execute transaction - (txn, sender, receiver) -} - -fn generate_p2p_txn( - sender: &mut AccountData, - receiver: &AccountData, - transfer_amount: u64, -) -> AnalyzedTransaction { - let txn = Transaction::UserTransaction(peer_to_peer_txn( - sender.account(), - receiver.account(), - sender.sequence_number(), - transfer_amount, - 100, - )) - .into(); - sender.increment_sequence_number(); - txn -} - -fn compare_txn_outputs( - unsharded_txn_output: Vec, - sharded_txn_output: Vec, -) { - assert_eq!(unsharded_txn_output.len(), sharded_txn_output.len()); - for i in 0..unsharded_txn_output.len() { - assert_eq!( - unsharded_txn_output[i].status(), - sharded_txn_output[i].status() - ); - assert_eq!( - unsharded_txn_output[i].gas_used(), - sharded_txn_output[i].gas_used() - ); - //assert_eq!(unsharded_txn_output[i].write_set(), sharded_txn_output[i].write_set()); - assert_eq!( - unsharded_txn_output[i].events(), - sharded_txn_output[i].events() - ); - // Global supply tracking for coin is not supported in sharded execution yet, so we filter - // out the table item from the write set, which has the global supply. This is a hack until - // we support global supply tracking in sharded execution. - let unsharded_write_set_without_table_item = unsharded_txn_output[i] - .write_set() - .into_iter() - .filter(|(k, _)| matches!(k.inner(), &StateKeyInner::AccessPath(_))) - .collect::>(); - let sharded_write_set_without_table_item = sharded_txn_output[i] - .write_set() - .into_iter() - .filter(|(k, _)| matches!(k.inner(), &StateKeyInner::AccessPath(_))) - .collect::>(); - assert_eq!( - unsharded_write_set_without_table_item, - sharded_write_set_without_table_item - ); - } -} #[test] fn test_sharded_block_executor_no_conflict() { - let num_txns = 400; let num_shards = 8; - let mut executor = FakeExecutor::from_head_genesis(); - let mut transactions = Vec::new(); - for _ in 0..num_txns { - transactions.push(generate_non_conflicting_p2p(&mut executor).0) - } - let partitioner = ShardedBlockPartitioner::new(num_shards); - let partitioned_txns = partitioner.partition(transactions.clone(), 2, 0.9); - let executor_clients = - ShardedExecutorClient::create_sharded_executor_clients(num_shards, Some(2)); - let sharded_block_executor = ShardedBlockExecutor::new(executor_clients); - let sharded_txn_output = sharded_block_executor - .execute_block( - Arc::new(executor.data_store().clone()), - partitioned_txns, - 2, - None, - ) - .unwrap(); - let unsharded_txn_output = AptosVM::execute_block( - transactions.into_iter().map(|t| t.into_txn()).collect(), - &executor.data_store(), - None, - ) - .unwrap(); - compare_txn_outputs(unsharded_txn_output, sharded_txn_output); + let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(2)); + let sharded_block_executor = ShardedBlockExecutor::new(client); + test_utils::test_sharded_block_executor_no_conflict(sharded_block_executor); } #[test] // Sharded execution with cross shard conflict doesn't work for now because we don't have // cross round dependency tracking yet. fn test_sharded_block_executor_with_conflict_parallel() { - sharded_block_executor_with_conflict(4) + let num_shards = 7; + let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(4)); + let sharded_block_executor = ShardedBlockExecutor::new(client); + test_utils::sharded_block_executor_with_conflict(sharded_block_executor, 4); } #[test] fn test_sharded_block_executor_with_conflict_sequential() { - sharded_block_executor_with_conflict(1) -} - -fn sharded_block_executor_with_conflict(concurrency: usize) { - let num_txns = 800; let num_shards = 7; - let num_accounts = 80; - let mut executor = FakeExecutor::from_head_genesis(); - let mut transactions = Vec::new(); - let mut accounts = Vec::new(); - let mut txn_hash_to_account = HashMap::new(); - for _ in 0..num_accounts { - let account = generate_account_at(&mut executor, AccountAddress::random()); - accounts.push(Mutex::new(account)); - } - for i in 1..num_txns / num_accounts { - for j in 0..num_accounts { - let sender = &mut accounts[j].lock().unwrap(); - let sender_addr = *sender.address(); - let receiver = &accounts[(j + i) % num_accounts].lock().unwrap(); - let transfer_amount = 1_000; - let txn = generate_p2p_txn(sender, receiver, transfer_amount); - txn_hash_to_account.insert(txn.transaction().hash(), sender_addr); - transactions.push(txn) - } - } - - let partitioner = ShardedBlockPartitioner::new(num_shards); - let partitioned_txns = partitioner.partition(transactions.clone(), 8, 0.9); - - let execution_ordered_txns = SubBlocksForShard::flatten(partitioned_txns.clone()) - .into_iter() - .map(|t| t.into_txn()) - .collect::>(); - - let executor_clients = - ShardedExecutorClient::create_sharded_executor_clients(num_shards, Some(concurrency)); - let sharded_block_executor = ShardedBlockExecutor::new(executor_clients); - let sharded_txn_output = sharded_block_executor - .execute_block( - Arc::new(executor.data_store().clone()), - partitioned_txns, - concurrency, - None, - ) - .unwrap(); - - let unsharded_txn_output = - AptosVM::execute_block(execution_ordered_txns, &executor.data_store(), None).unwrap(); - compare_txn_outputs(unsharded_txn_output, sharded_txn_output); + let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(1)); + let sharded_block_executor = ShardedBlockExecutor::new(client); + test_utils::sharded_block_executor_with_conflict(sharded_block_executor, 1) } #[test] fn test_sharded_block_executor_with_random_transfers_parallel() { - sharded_block_executor_with_random_transfers(4) + let mut rng = OsRng; + let max_num_shards = 32; + let num_shards = rng.gen_range(1, max_num_shards); + let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(4)); + let sharded_block_executor = ShardedBlockExecutor::new(client); + test_utils::sharded_block_executor_with_random_transfers(sharded_block_executor, 4) } #[test] fn test_sharded_block_executor_with_random_transfers_sequential() { - sharded_block_executor_with_random_transfers(1) -} - -fn sharded_block_executor_with_random_transfers(concurrency: usize) { let mut rng = OsRng; - let max_accounts = 200; - let max_txns = 1000; let max_num_shards = 32; - let num_accounts = rng.gen_range(1, max_accounts); - let mut accounts = Vec::new(); - let mut executor = FakeExecutor::from_head_genesis(); - - for _ in 0..num_accounts { - let account = generate_account_at(&mut executor, AccountAddress::random()); - accounts.push(Mutex::new(account)); - } - - let num_txns = rng.gen_range(1, max_txns); let num_shards = rng.gen_range(1, max_num_shards); - - let mut transactions = Vec::new(); - - for _ in 0..num_txns { - let indices = rand::seq::index::sample(&mut rng, num_accounts, 2); - let sender = &mut accounts[indices.index(0)].lock().unwrap(); - let receiver = &accounts[indices.index(1)].lock().unwrap(); - let transfer_amount = rng.gen_range(1, 1000); - let txn = generate_p2p_txn(sender, receiver, transfer_amount); - transactions.push(txn) - } - - let partitioner = ShardedBlockPartitioner::new(num_shards); - let partitioned_txns = partitioner.partition(transactions.clone(), 8, 0.9); - - let execution_ordered_txns = SubBlocksForShard::flatten(partitioned_txns.clone()) - .into_iter() - .map(|t| t.into_txn()) - .collect::>(); - - let executor_clients = - ShardedExecutorClient::create_sharded_executor_clients(num_shards, Some(concurrency)); - let sharded_block_executor = ShardedBlockExecutor::new(executor_clients); - let sharded_txn_output = sharded_block_executor - .execute_block( - Arc::new(executor.data_store().clone()), - partitioned_txns, - concurrency, - None, - ) - .unwrap(); - - let unsharded_txn_output = - AptosVM::execute_block(execution_ordered_txns, &executor.data_store(), None).unwrap(); - compare_txn_outputs(unsharded_txn_output, sharded_txn_output); + let client = LocalExecutorService::setup_local_executor_shards(num_shards, Some(1)); + let sharded_block_executor = ShardedBlockExecutor::new(client); + test_utils::sharded_block_executor_with_random_transfers(sharded_block_executor, 1) } diff --git a/consensus/safety-rules/src/remote_service.rs b/consensus/safety-rules/src/remote_service.rs index 85605fbe72435..fc7ec1b944f2e 100644 --- a/consensus/safety-rules/src/remote_service.rs +++ b/consensus/safety-rules/src/remote_service.rs @@ -14,7 +14,7 @@ use std::net::SocketAddr; pub trait RemoteService { fn client(&self) -> SerializerClient { let network_client = NetworkClient::new( - "safety-rules", + "safety-rules".to_string(), self.server_address(), self.network_timeout_ms(), ); @@ -35,7 +35,8 @@ pub fn execute(storage: PersistentSafetyStorage, listen_addr: SocketAddr, networ } let mut serializer_service = SerializerService::new(safety_rules); - let mut network_server = NetworkServer::new("safety-rules", listen_addr, network_timeout_ms); + let mut network_server = + NetworkServer::new("safety-rules".to_string(), listen_addr, network_timeout_ms); loop { if let Err(e) = process_one_message(&mut network_server, &mut serializer_service) { diff --git a/execution/executor-service/Cargo.toml b/execution/executor-service/Cargo.toml index 7fcddb11b8db9..b00234d9621d5 100644 --- a/execution/executor-service/Cargo.toml +++ b/execution/executor-service/Cargo.toml @@ -14,6 +14,7 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } +aptos-block-partitioner = { workspace = true } aptos-config = { workspace = true } aptos-crypto = { workspace = true } aptos-executor-types = { workspace = true } @@ -22,11 +23,20 @@ aptos-logger = { workspace = true } aptos-retrier = { workspace = true } aptos-secure-net = { workspace = true } aptos-state-view = { workspace = true } +aptos-storage-interface = { workspace = true } aptos-types = { workspace = true } -aptos-vm = { workspace = true } +aptos-vm = { workspace = true, features = ["testing"] } bcs = { workspace = true } clap = { workspace = true } +crossbeam-channel = { workspace = true } itertools = { workspace = true } +num_cpus = { workspace = true } +rand = { workspace = true } +rayon = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } + +[dev-dependencies] +aptos-language-e2e-tests = { workspace = true } +aptos-vm = { workspace = true } diff --git a/execution/executor-service/src/lib.rs b/execution/executor-service/src/lib.rs index 9994542a1cd8a..031f7ab265efe 100644 --- a/execution/executor-service/src/lib.rs +++ b/execution/executor-service/src/lib.rs @@ -10,18 +10,30 @@ use serde::{Deserialize, Serialize}; mod error; pub mod process_executor_service; -pub mod remote_executor_client; +mod remote_cordinator_client; +mod remote_cross_shard_client; +mod remote_executor_client; pub mod remote_executor_service; #[cfg(test)] +mod test_utils; +#[cfg(test)] +mod tests; +#[cfg(test)] mod thread_executor_service; #[derive(Clone, Debug, Deserialize, Serialize)] -pub struct BlockExecutionResult { +pub struct RemoteExecutionResult { pub inner: Result>, VMStatus>, } +impl RemoteExecutionResult { + pub fn new(inner: Result>, VMStatus>) -> Self { + Self { inner } + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] -pub enum BlockExecutionRequest { +pub enum RemoteExecutionRequest { ExecuteBlock(ExecuteBlockCommand), } @@ -37,3 +49,21 @@ pub struct ExecuteBlockCommand { pub(crate) concurrency_level: usize, pub(crate) maybe_block_gas_limit: Option, } + +impl ExecuteBlockCommand { + pub fn into( + self, + ) -> ( + SubBlocksForShard, + InMemoryStateView, + usize, + Option, + ) { + ( + self.sub_blocks, + self.state_view, + self.concurrency_level, + self.maybe_block_gas_limit, + ) + } +} diff --git a/execution/executor-service/src/main.rs b/execution/executor-service/src/main.rs index f313e7fe530ed..d1c6a31e89212 100644 --- a/execution/executor-service/src/main.rs +++ b/execution/executor-service/src/main.rs @@ -1,9 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use aptos_executor_service::process_executor_service::ProcessExecutorService; use clap::Parser; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; #[derive(Debug, Parser)] struct Args { @@ -15,13 +13,14 @@ struct Args { } fn main() { - let args = Args::parse(); + // TODO (skedia): Uncomment this once the executor service is implemented. + let _args = Args::parse(); aptos_logger::Logger::new().init(); - let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.server_port); - let executor_service = - ProcessExecutorService::new(server_addr, 1000, args.num_executor_threads); - executor_service.run(); + // let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), args.server_port); + // let executor_service = + // ProcessExecutorService::new(server_addr, 1000, args.num_executor_threads); + // executor_service.run(); } #[test] diff --git a/execution/executor-service/src/process_executor_service.rs b/execution/executor-service/src/process_executor_service.rs index f2c888be0b209..cfc849e7dba82 100644 --- a/execution/executor-service/src/process_executor_service.rs +++ b/execution/executor-service/src/process_executor_service.rs @@ -1,54 +1,39 @@ // Copyright © Aptos Foundation -use crate::{ - remote_executor_service, - remote_executor_service::{ExecutorService, RemoteExecutorService}, -}; +use crate::remote_executor_service::ExecutorService; use aptos_logger::info; -use aptos_secure_net::NetworkServer; +use aptos_types::block_executor::partitioner::ShardId; use std::net::SocketAddr; /// An implementation of the remote executor service that runs in a standalone process. pub struct ProcessExecutorService { - server_addr: SocketAddr, - network_timeout_ms: u64, - num_executor_threads: usize, + _executor_service: ExecutorService, } impl ProcessExecutorService { - pub fn new(server_addr: SocketAddr, network_timeout: u64, num_executor_threads: usize) -> Self { - Self { - server_addr, - network_timeout_ms: network_timeout, - num_executor_threads, - } - } - - pub fn run(&self) { + pub fn new( + shard_id: ShardId, + num_shards: usize, + num_threads: usize, + coordinator_address: SocketAddr, + remote_shard_addresses: Vec, + ) -> Self { + let self_address = remote_shard_addresses[shard_id]; info!( "Starting process remote executor service on {}", - self.server_addr + self_address ); - let network_server = NetworkServer::new( - "process-executor-service", - self.server_addr, - self.network_timeout_ms, + let mut executor_service = ExecutorService::new( + shard_id, + num_shards, + num_threads, + self_address, + coordinator_address, + remote_shard_addresses, ); - let executor_service = ExecutorService::new(self.num_executor_threads); - remote_executor_service::execute(network_server, executor_service); - } -} - -impl RemoteExecutorService for ProcessExecutorService { - fn server_address(&self) -> SocketAddr { - self.server_addr - } - - fn network_timeout_ms(&self) -> u64 { - self.network_timeout_ms - } - - fn executor_threads(&self) -> usize { - self.num_executor_threads + executor_service.start(); + Self { + _executor_service: executor_service, + } } } diff --git a/execution/executor-service/src/remote_cordinator_client.rs b/execution/executor-service/src/remote_cordinator_client.rs new file mode 100644 index 0000000000000..8b7e9a4045b46 --- /dev/null +++ b/execution/executor-service/src/remote_cordinator_client.rs @@ -0,0 +1,61 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 +use crate::{RemoteExecutionRequest, RemoteExecutionResult}; +use aptos_secure_net::network_controller::{Message, NetworkController}; +use aptos_state_view::in_memory_state_view::InMemoryStateView; +use aptos_types::{ + block_executor::partitioner::ShardId, transaction::TransactionOutput, vm_status::VMStatus, +}; +use aptos_vm::sharded_block_executor::{ + coordinator_client::CoordinatorClient, ExecutorShardCommand, +}; +use crossbeam_channel::{Receiver, Sender}; +use std::{net::SocketAddr, sync::Arc}; + +pub struct RemoteCoordinatorClient { + command_rx: Receiver, + result_tx: Sender, +} + +impl RemoteCoordinatorClient { + pub fn new( + shard_id: ShardId, + controller: &mut NetworkController, + coordinator_address: SocketAddr, + ) -> Self { + let execute_command_type = format!("execute_command_{}", shard_id); + let execute_result_type = format!("execute_result_{}", shard_id); + let command_rx = controller.create_inbound_channel(execute_command_type); + let result_tx = + controller.create_outbound_channel(coordinator_address, execute_result_type); + + Self { + command_rx, + result_tx, + } + } +} + +impl CoordinatorClient for RemoteCoordinatorClient { + fn receive_execute_command(&self) -> ExecutorShardCommand { + let message = self.command_rx.recv().unwrap(); + let request: RemoteExecutionRequest = bcs::from_bytes(&message.data).unwrap(); + match request { + RemoteExecutionRequest::ExecuteBlock(command) => { + let (sub_blocks, state_view, concurrency, gas_limit) = command.into(); + ExecutorShardCommand::ExecuteSubBlocks( + Arc::new(state_view), + sub_blocks, + concurrency, + gas_limit, + ) + }, + } + } + + fn send_execution_result(&self, result: Result>, VMStatus>) { + let remote_execution_result = RemoteExecutionResult::new(result); + let output_message = bcs::to_bytes(&remote_execution_result).unwrap(); + self.result_tx.send(Message::new(output_message)).unwrap(); + } +} diff --git a/execution/executor-service/src/remote_cross_shard_client.rs b/execution/executor-service/src/remote_cross_shard_client.rs new file mode 100644 index 0000000000000..282cac0e8ef38 --- /dev/null +++ b/execution/executor-service/src/remote_cross_shard_client.rs @@ -0,0 +1,64 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 +use aptos_block_partitioner::sharded_block_partitioner::MAX_ALLOWED_PARTITIONING_ROUNDS; +use aptos_secure_net::network_controller::{Message, NetworkController}; +use aptos_types::block_executor::partitioner::{RoundId, ShardId}; +use aptos_vm::sharded_block_executor::{ + cross_shard_client::CrossShardClient, messages::CrossShardMsg, +}; +use crossbeam_channel::{Receiver, Sender}; +use std::{ + net::SocketAddr, + sync::{Arc, Mutex}, +}; + +pub struct RemoteCrossShardClient { + // The senders of cross-shard messages to other shards per round. + message_txs: Arc>>>>, + // The receivers of cross shard messages from other shards per round. + message_rxs: Arc>>>, +} + +impl RemoteCrossShardClient { + pub fn new(controller: &mut NetworkController, shard_addresses: Vec) -> Self { + let mut message_txs = vec![]; + let mut message_rxs = vec![]; + // Create outbound channels for each shard per round. + for remote_address in shard_addresses.iter() { + let mut txs = vec![]; + for round in 0..MAX_ALLOWED_PARTITIONING_ROUNDS { + let message_type = format!("cross_shard_{}", round); + let tx = controller.create_outbound_channel(*remote_address, message_type); + txs.push(Mutex::new(tx)); + } + message_txs.push(txs); + } + + // Create inbound channels for each round + for round in 0..MAX_ALLOWED_PARTITIONING_ROUNDS { + let message_type = format!("cross_shard_{}", round); + let rx = controller.create_inbound_channel(message_type); + message_rxs.push(Mutex::new(rx)); + } + + Self { + message_txs: Arc::new(message_txs), + message_rxs: Arc::new(message_rxs), + } + } +} + +impl CrossShardClient for RemoteCrossShardClient { + fn send_cross_shard_msg(&self, shard_id: ShardId, round: RoundId, msg: CrossShardMsg) { + let input_message = bcs::to_bytes(&msg).unwrap(); + let tx = self.message_txs[shard_id][round].lock().unwrap(); + tx.send(Message::new(input_message)).unwrap(); + } + + fn receive_cross_shard_msg(&self, current_round: RoundId) -> CrossShardMsg { + let rx = self.message_rxs[current_round].lock().unwrap(); + let message = rx.recv().unwrap(); + let msg: CrossShardMsg = bcs::from_bytes(&message.to_bytes()).unwrap(); + msg + } +} diff --git a/execution/executor-service/src/remote_executor_client.rs b/execution/executor-service/src/remote_executor_client.rs index b4c91a3e0ebde..3d59d16916ee4 100644 --- a/execution/executor-service/src/remote_executor_client.rs +++ b/execution/executor-service/src/remote_executor_client.rs @@ -1,77 +1,114 @@ // Copyright © Aptos Foundation +// Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 - -use crate::{error::Error, BlockExecutionRequest, BlockExecutionResult, ExecuteBlockCommand}; -use aptos_logger::error; -use aptos_retrier::{fixed_retry_strategy, retry}; -use aptos_secure_net::NetworkClient; +use crate::{ExecuteBlockCommand, RemoteExecutionRequest, RemoteExecutionResult}; +use aptos_logger::trace; +use aptos_secure_net::network_controller::{Message, NetworkController}; use aptos_state_view::StateView; use aptos_types::{ block_executor::partitioner::SubBlocksForShard, transaction::{analyzed_transaction::AnalyzedTransaction, TransactionOutput}, vm_status::VMStatus, }; -use aptos_vm::sharded_block_executor::block_executor_client::BlockExecutorClient; -use std::{net::SocketAddr, sync::Mutex}; +use aptos_vm::sharded_block_executor::executor_client::ExecutorClient; +use crossbeam_channel::{Receiver, Sender}; +use std::{ + net::SocketAddr, + ops::Deref, + sync::{Arc, Mutex}, +}; -/// An implementation of [`BlockExecutorClient`] that supports executing blocks remotely. -pub struct RemoteExecutorClient { - network_client: Mutex, +#[allow(dead_code)] +pub struct RemoteExecutorClient { + // Channels to send execute block commands to the executor shards. + command_txs: Arc>>>, + // Channels to receive execution results from the executor shards. + result_rxs: Vec>, + // Thread pool used to pre-fetch the state values for the block in parallel and create an in-memory state view. + thread_pool: Arc, + phantom: std::marker::PhantomData, } -impl RemoteExecutorClient { - pub fn new(server_address: SocketAddr, network_timeout_ms: u64) -> Self { - let network_client = NetworkClient::new( - "remote-executor-service", - server_address, - network_timeout_ms, +#[allow(dead_code)] +impl RemoteExecutorClient { + pub fn new( + remote_shard_addresses: Vec, + controller: &mut NetworkController, + num_threads: Option, + ) -> Self { + let num_threads = num_threads.unwrap_or_else(num_cpus::get); + let thread_pool = Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .build() + .unwrap(), ); + let (command_txs, result_rxs) = remote_shard_addresses + .iter() + .enumerate() + .map(|(shard_id, address)| { + let execute_command_type = format!("execute_command_{}", shard_id); + let execute_result_type = format!("execute_result_{}", shard_id); + let command_tx = + Mutex::new(controller.create_outbound_channel(*address, execute_command_type)); + let result_rx = controller.create_inbound_channel(execute_result_type); + (command_tx, result_rx) + }) + .unzip(); Self { - network_client: Mutex::new(network_client), + command_txs: Arc::new(command_txs), + result_rxs, + thread_pool, + phantom: std::marker::PhantomData, } } +} - fn execute_block_inner( - &self, - execution_request: BlockExecutionRequest, - ) -> Result { - let input_message = bcs::to_bytes(&execution_request)?; - let mut network_client = self.network_client.lock().unwrap(); - network_client.write(&input_message)?; - let bytes = network_client.read()?; - Ok(bcs::from_bytes(&bytes)?) - } - - fn execute_block_with_retry( - &self, - execution_request: BlockExecutionRequest, - ) -> BlockExecutionResult { - retry(fixed_retry_strategy(5, 20), || { - let res = self.execute_block_inner(execution_request.clone()); - if let Err(e) = &res { - error!("Failed to execute block: {:?}", e); - } - res - }) - .unwrap() +impl ExecutorClient for RemoteExecutorClient { + fn num_shards(&self) -> usize { + self.command_txs.len() } -} -impl BlockExecutorClient for RemoteExecutorClient { - fn execute_block( + fn execute_block( &self, - sub_blocks: SubBlocksForShard, - state_view: &S, - concurrency_level: usize, + state_view: Arc, + block: Vec>, + concurrency_level_per_shard: usize, maybe_block_gas_limit: Option, - ) -> Result>, VMStatus> { - let input = BlockExecutionRequest::ExecuteBlock(ExecuteBlockCommand { - sub_blocks, - state_view: S::as_in_memory_state_view(state_view), - concurrency_level, - maybe_block_gas_limit, + ) { + self.thread_pool.scope(|s| { + for (shard_id, sub_blocks) in block.into_iter().enumerate() { + let state_view = state_view.clone(); + let senders = self.command_txs.clone(); + s.spawn(move |_| { + let execution_request = + RemoteExecutionRequest::ExecuteBlock(ExecuteBlockCommand { + sub_blocks, + // TODO(skedia): Instead of serializing the entire state view, we should + // serialize only the state values needed for the shard. + state_view: S::as_in_memory_state_view(state_view.deref()), + concurrency_level: concurrency_level_per_shard, + maybe_block_gas_limit, + }); + + senders[shard_id] + .lock() + .unwrap() + .send(Message::new(bcs::to_bytes(&execution_request).unwrap())) + .unwrap() + }); + } }); + } - self.execute_block_with_retry(input).inner + fn get_execution_result(&self) -> Result>>, VMStatus> { + trace!("RemoteExecutorClient Waiting for results"); + let mut results = vec![]; + for rx in self.result_rxs.iter() { + let received_bytes = rx.recv().unwrap().to_bytes(); + let result: RemoteExecutionResult = bcs::from_bytes(&received_bytes).unwrap(); + results.push(result.inner?); + } + Ok(results) } } diff --git a/execution/executor-service/src/remote_executor_service.rs b/execution/executor-service/src/remote_executor_service.rs index 968837275b08d..781defb0996fb 100644 --- a/execution/executor-service/src/remote_executor_service.rs +++ b/execution/executor-service/src/remote_executor_service.rs @@ -2,252 +2,59 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - error::Error, remote_executor_client::RemoteExecutorClient, BlockExecutionRequest, - BlockExecutionResult, + remote_cordinator_client::RemoteCoordinatorClient, + remote_cross_shard_client::RemoteCrossShardClient, }; -use aptos_logger::{error, info}; -use aptos_secure_net::NetworkServer; -use aptos_vm::sharded_block_executor::block_executor_client::{ - BlockExecutorClient, VMExecutorClient, -}; -use std::net::SocketAddr; +use aptos_secure_net::network_controller::NetworkController; +use aptos_state_view::in_memory_state_view::InMemoryStateView; +use aptos_types::block_executor::partitioner::ShardId; +use aptos_vm::sharded_block_executor::sharded_executor_service::ShardedExecutorService; +use std::{net::SocketAddr, sync::Arc}; /// A service that provides support for remote execution. Essentially, it reads a request from /// the remote executor client and executes the block locally and returns the result. pub struct ExecutorService { - client: VMExecutorClient, + controller: NetworkController, + executor_service: Arc>, } impl ExecutorService { - pub fn new(num_executor_threads: usize) -> Self { - Self { - client: VMExecutorClient::new(num_executor_threads), - } - } - - pub fn handle_message(&self, execution_message: Vec) -> Result, Error> { - let input = bcs::from_bytes(&execution_message)?; - let result = self.handle_execution_request(input)?; - Ok(bcs::to_bytes(&result)?) - } - - pub fn handle_execution_request( - &self, - execution_request: BlockExecutionRequest, - ) -> Result { - let result = match execution_request { - BlockExecutionRequest::ExecuteBlock(command) => self.client.execute_block( - command.sub_blocks, - &command.state_view, - command.concurrency_level, - command.maybe_block_gas_limit, - ), - }; - Ok(BlockExecutionResult { inner: result }) - } -} - -pub trait RemoteExecutorService { - fn client(&self) -> RemoteExecutorClient { - RemoteExecutorClient::new(self.server_address(), self.network_timeout_ms()) - } - - fn server_address(&self) -> SocketAddr; - - /// Network Timeout in milliseconds. - fn network_timeout_ms(&self) -> u64; - - fn executor_threads(&self) -> usize; -} - -pub fn execute(mut network_server: NetworkServer, executor_service: ExecutorService) { - loop { - if let Err(e) = process_one_message(&mut network_server, &executor_service) { - error!("Failed to process message: {}", e); - } - } -} + pub fn new( + shard_id: ShardId, + num_shards: usize, + num_threads: usize, + self_address: SocketAddr, + coordinator_address: SocketAddr, + remote_shard_addresses: Vec, + ) -> Self { + let service_name = format!("executor_service-{}", shard_id); + let mut controller = NetworkController::new(service_name, self_address, 5000); + let coordinator_client = Arc::new(RemoteCoordinatorClient::new( + shard_id, + &mut controller, + coordinator_address, + )); + let cross_shard_client = Arc::new(RemoteCrossShardClient::new( + &mut controller, + remote_shard_addresses, + )); + + let executor_service = Arc::new(ShardedExecutorService::new( + shard_id, + num_shards, + num_threads, + coordinator_client, + cross_shard_client, + )); -fn process_one_message( - network_server: &mut NetworkServer, - executor_service: &ExecutorService, -) -> Result<(), Error> { - let request = network_server.read()?; - let response = executor_service.handle_message(request)?; - info!("server sending response"); - network_server.write(&response)?; - Ok(()) -} - -#[cfg(test)] -mod tests { - use crate::{ - remote_executor_service::RemoteExecutorService, - thread_executor_service::ThreadExecutorService, - }; - use aptos_language_e2e_tests::{ - account::AccountData, common_transactions::peer_to_peer_txn, executor::FakeExecutor, - }; - use aptos_types::{ - account_config::{DepositEvent, WithdrawEvent}, - block_executor::partitioner::{ - CrossShardDependencies, SubBlock, SubBlocksForShard, TransactionWithDependencies, - }, - transaction::{ - analyzed_transaction::AnalyzedTransaction, ExecutionStatus, Transaction, - TransactionOutput, TransactionStatus, - }, - }; - use aptos_vm::sharded_block_executor::{ - block_executor_client::BlockExecutorClient, ShardedBlockExecutor, - }; - use std::sync::Arc; - - fn generate_transactions( - executor: &mut FakeExecutor, - ) -> (Vec, AccountData) { - let sender = executor.create_raw_account_data(3_000_000_000, 10); - let receiver = executor.create_raw_account_data(3_000_000_000, 10); - executor.add_account_data(&sender); - executor.add_account_data(&receiver); - - let transfer_amount = 1_000; - - // execute transaction - let txns: Vec = vec![ - Transaction::UserTransaction(peer_to_peer_txn( - sender.account(), - receiver.account(), - 10, - transfer_amount, - 100, - )) - .into(), - Transaction::UserTransaction(peer_to_peer_txn( - sender.account(), - receiver.account(), - 11, - transfer_amount, - 100, - )) - .into(), - Transaction::UserTransaction(peer_to_peer_txn( - sender.account(), - receiver.account(), - 12, - transfer_amount, - 100, - )) - .into(), - Transaction::UserTransaction(peer_to_peer_txn( - sender.account(), - receiver.account(), - 13, - transfer_amount, - 100, - )) - .into(), - ]; - (txns, receiver) - } - - fn verify_txn_output( - transfer_amount: u64, - output: &[TransactionOutput], - executor: &mut FakeExecutor, - receiver: &AccountData, - ) { - for (idx, txn_output) in output.iter().enumerate() { - assert_eq!( - txn_output.status(), - &TransactionStatus::Keep(ExecutionStatus::Success) - ); - - // check events - for event in txn_output.events() { - if let Ok(payload) = WithdrawEvent::try_from(event) { - assert_eq!(transfer_amount, payload.amount()); - } else if let Ok(payload) = DepositEvent::try_from(event) { - if payload.amount() == 0 { - continue; - } - assert_eq!(transfer_amount, payload.amount()); - } else { - panic!("Unexpected Event Type") - } - } - - let original_receiver_balance = executor - .read_coin_store_resource(receiver.account()) - .expect("receiver balcne must exist"); - executor.apply_write_set(txn_output.write_set()); - - // check that numbers in stored DB are correct - let receiver_balance = original_receiver_balance.coin() + transfer_amount; - let updated_receiver_balance = executor - .read_coin_store_resource(receiver.account()) - .expect("receiver balance must exist"); - assert_eq!(receiver_balance, updated_receiver_balance.coin()); - assert_eq!( - idx as u64 + 1, - updated_receiver_balance.deposit_events().count() - ); - } - } - - #[test] - fn test_remote_block_execute() { - let executor_service = ThreadExecutorService::new(5000, 2); - // Uncomment for testing with a real server - // let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080); - // let client = RemoteExecutorClient::new(server_addr, 1000); - - let client = executor_service.client(); - let mut executor = FakeExecutor::from_head_genesis(); - for _ in 0..5 { - let (txns, receiver) = generate_transactions(&mut executor); - let txns_with_deps = txns - .into_iter() - .map(|txn| TransactionWithDependencies::new(txn, CrossShardDependencies::default())) - .collect::>(); - let sub_block = SubBlock::new(0, txns_with_deps); - let sub_blocks_for_shard = SubBlocksForShard::new(0, vec![sub_block]); - - let output = client - .execute_block(sub_blocks_for_shard, executor.data_store(), 2, None) - .unwrap(); - verify_txn_output(1_000, &output[0], &mut executor, &receiver); + Self { + controller, + executor_service, } } - #[test] - fn test_sharded_remote_block_executor() { - let executor_service = ThreadExecutorService::new(5000, 2); - let client = executor_service.client(); - // Uncomment for testing with a real server - // let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080); - // let client = RemoteExecutorClient::new(server_addr, 1000); - - let sharded_block_executor = ShardedBlockExecutor::new(vec![client]); - let mut executor = FakeExecutor::from_head_genesis(); - for _ in 0..5 { - let (txns, receiver) = generate_transactions(&mut executor); - let txns_with_deps = txns - .into_iter() - .map(|txn| TransactionWithDependencies::new(txn, CrossShardDependencies::default())) - .collect::>(); - let sub_block = SubBlock::new(0, txns_with_deps); - let sub_blocks_for_shard = SubBlocksForShard::new(0, vec![sub_block]); - - let output = sharded_block_executor - .execute_block( - Arc::new(executor.data_store().clone()), - vec![sub_blocks_for_shard], - 2, - None, - ) - .unwrap(); - verify_txn_output(1_000, &output, &mut executor, &receiver); - } + pub fn start(&mut self) { + self.controller.start(); + self.executor_service.start(); } } diff --git a/execution/executor-service/src/test_utils.rs b/execution/executor-service/src/test_utils.rs new file mode 100644 index 0000000000000..20b255296ec86 --- /dev/null +++ b/execution/executor-service/src/test_utils.rs @@ -0,0 +1,121 @@ +// Copyright © Aptos Foundation + +use aptos_block_partitioner::sharded_block_partitioner::ShardedBlockPartitioner; +use aptos_language_e2e_tests::{ + account::AccountData, common_transactions::peer_to_peer_txn, data_store::FakeDataStore, + executor::FakeExecutor, +}; +use aptos_types::{ + state_store::state_key::StateKeyInner, + transaction::{analyzed_transaction::AnalyzedTransaction, Transaction, TransactionOutput}, +}; +use aptos_vm::{ + sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, + AptosVM, VMExecutor, +}; +use std::sync::Arc; + +fn generate_non_conflicting_sender_receiver( + executor: &mut FakeExecutor, +) -> (AccountData, AccountData) { + let sender = executor.create_raw_account_data(3_000_000_000, 0); + let receiver = executor.create_raw_account_data(3_000_000_000, 0); + executor.add_account_data(&sender); + executor.add_account_data(&receiver); + (sender, receiver) +} + +pub fn generate_non_conflicting_p2p( + executor: &mut FakeExecutor, +) -> (AnalyzedTransaction, AccountData, AccountData) { + let (mut sender, receiver) = generate_non_conflicting_sender_receiver(executor); + let transfer_amount = 1_000; + let txn = generate_p2p_txn(&mut sender, &receiver, transfer_amount); + // execute transaction + (txn, sender, receiver) +} + +pub fn generate_p2p_txn( + sender: &mut AccountData, + receiver: &AccountData, + transfer_amount: u64, +) -> AnalyzedTransaction { + let txn = Transaction::UserTransaction(peer_to_peer_txn( + sender.account(), + receiver.account(), + sender.sequence_number(), + transfer_amount, + 100, + )) + .into(); + sender.increment_sequence_number(); + txn +} + +pub fn compare_txn_outputs( + unsharded_txn_output: Vec, + sharded_txn_output: Vec, +) { + assert_eq!(unsharded_txn_output.len(), sharded_txn_output.len()); + for i in 0..unsharded_txn_output.len() { + assert_eq!( + unsharded_txn_output[i].status(), + sharded_txn_output[i].status() + ); + assert_eq!( + unsharded_txn_output[i].gas_used(), + sharded_txn_output[i].gas_used() + ); + //assert_eq!(unsharded_txn_output[i].write_set(), sharded_txn_output[i].write_set()); + assert_eq!( + unsharded_txn_output[i].events(), + sharded_txn_output[i].events() + ); + // Global supply tracking for coin is not supported in sharded execution yet, so we filter + // out the table item from the write set, which has the global supply. This is a hack until + // we support global supply tracking in sharded execution. + let unsharded_write_set_without_table_item = unsharded_txn_output[i] + .write_set() + .into_iter() + .filter(|(k, _)| matches!(k.inner(), &StateKeyInner::AccessPath(_))) + .collect::>(); + let sharded_write_set_without_table_item = sharded_txn_output[i] + .write_set() + .into_iter() + .filter(|(k, _)| matches!(k.inner(), &StateKeyInner::AccessPath(_))) + .collect::>(); + assert_eq!( + unsharded_write_set_without_table_item, + sharded_write_set_without_table_item + ); + } +} + +pub fn test_sharded_block_executor_no_conflict>( + sharded_block_executor: ShardedBlockExecutor, +) { + let num_txns = 400; + let num_shards = sharded_block_executor.num_shards(); + let mut executor = FakeExecutor::from_head_genesis(); + let mut transactions = Vec::new(); + for _ in 0..num_txns { + transactions.push(generate_non_conflicting_p2p(&mut executor).0) + } + let partitioner = ShardedBlockPartitioner::new(num_shards); + let partitioned_txns = partitioner.partition(transactions.clone(), 2, 0.9); + let sharded_txn_output = sharded_block_executor + .execute_block( + Arc::new(executor.data_store().clone()), + partitioned_txns, + 2, + None, + ) + .unwrap(); + let unsharded_txn_output = AptosVM::execute_block( + transactions.into_iter().map(|t| t.into_txn()).collect(), + &executor.data_store(), + None, + ) + .unwrap(); + compare_txn_outputs(unsharded_txn_output, sharded_txn_output); +} diff --git a/execution/executor-service/src/tests.rs b/execution/executor-service/src/tests.rs new file mode 100644 index 0000000000000..1542cfc036d9f --- /dev/null +++ b/execution/executor-service/src/tests.rs @@ -0,0 +1,65 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + remote_executor_client::RemoteExecutorClient, test_utils, + thread_executor_service::ThreadExecutorService, +}; +use aptos_config::utils; +use aptos_language_e2e_tests::data_store::FakeDataStore; +use aptos_secure_net::network_controller::NetworkController; +use aptos_vm::sharded_block_executor::ShardedBlockExecutor; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +pub fn create_thread_remote_executor_shards( + num_shards: usize, + num_threads: Option, +) -> ( + NetworkController, + RemoteExecutorClient, + Vec, +) { + // First create the coordinator. + let listen_port = utils::get_available_port(); + let coordinator_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), listen_port); + let mut controller = NetworkController::new( + "remote-executor-coordinator".to_string(), + coordinator_address, + 5000, + ); + let remote_shard_addresses = (0..num_shards) + .map(|_| { + let listen_port = utils::get_available_port(); + SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), listen_port) + }) + .collect::>(); + + let num_threads = + num_threads.unwrap_or_else(|| (num_cpus::get() as f64 / num_shards as f64).ceil() as usize); + + let remote_executor_services = (0..num_shards) + .map(|shard_id| { + ThreadExecutorService::new( + shard_id, + num_shards, + num_threads, + coordinator_address, + remote_shard_addresses.clone(), + ) + }) + .collect::>(); + + let remote_executor_client = + RemoteExecutorClient::new(remote_shard_addresses, &mut controller, None); + (controller, remote_executor_client, remote_executor_services) +} + +#[test] +fn test_sharded_block_executor_no_conflict() { + let num_shards = 8; + let (mut controller, executor_client, _executor_services) = + create_thread_remote_executor_shards(num_shards, Some(2)); + controller.start(); + let sharded_block_executor = ShardedBlockExecutor::new(executor_client); + test_utils::test_sharded_block_executor_no_conflict(sharded_block_executor); +} diff --git a/execution/executor-service/src/thread_executor_service.rs b/execution/executor-service/src/thread_executor_service.rs index 92e4672192d76..a067e934d8ade 100644 --- a/execution/executor-service/src/thread_executor_service.rs +++ b/execution/executor-service/src/thread_executor_service.rs @@ -1,62 +1,46 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{ - remote_executor_service, - remote_executor_service::{ExecutorService, RemoteExecutorService}, -}; -use aptos_config::utils; -use aptos_logger::info; -use aptos_secure_net::NetworkServer; -use std::{ - net::{IpAddr, Ipv4Addr, SocketAddr}, - thread, - thread::JoinHandle, -}; +use crate::remote_executor_service::ExecutorService; +use aptos_types::block_executor::partitioner::ShardId; +use std::{net::SocketAddr, thread, thread::JoinHandle}; /// This is a simple implementation of RemoteExecutorService that runs the executor service in a /// separate thread. This should be used for testing only. pub struct ThreadExecutorService { _child: JoinHandle<()>, - server_addr: SocketAddr, - network_timeout_ms: u64, - num_executor_threads: usize, + _self_address: SocketAddr, } impl ThreadExecutorService { - pub fn new(network_timeout_ms: u64, num_executor_threads: usize) -> Self { - let listen_port = utils::get_available_port(); - let listen_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), listen_port); - let server_addr = listen_addr; - info!("Starting thread remote executor service on {}", listen_addr); - - let network_server = - NetworkServer::new("thread-executor-service", listen_addr, network_timeout_ms); - - let executor_service = ExecutorService::new(num_executor_threads); - - let child = thread::spawn(move || { - remote_executor_service::execute(network_server, executor_service); - }); + pub fn new( + shard_id: ShardId, + num_shards: usize, + num_threads: usize, + coordinator_address: SocketAddr, + remote_shard_addresses: Vec, + ) -> Self { + let self_address = remote_shard_addresses[shard_id]; + let mut executor_service = ExecutorService::new( + shard_id, + num_shards, + num_threads, + self_address, + coordinator_address, + remote_shard_addresses, + ); + + let thread_name = format!("ThreadExecutorService-{}", shard_id); + let builder = thread::Builder::new().name(thread_name); + + let child = builder + .spawn(move || { + executor_service.start(); + }) + .expect("Failed to spawn thread"); Self { _child: child, - server_addr, - network_timeout_ms, - num_executor_threads, + _self_address: self_address, } } } - -impl RemoteExecutorService for ThreadExecutorService { - fn server_address(&self) -> SocketAddr { - self.server_addr - } - - fn network_timeout_ms(&self) -> u64 { - self.network_timeout_ms - } - - fn executor_threads(&self) -> usize { - self.num_executor_threads - } -} diff --git a/execution/executor/src/components/chunk_output.rs b/execution/executor/src/components/chunk_output.rs index 2e34bcc36e644..9d94d7b83561d 100644 --- a/execution/executor/src/components/chunk_output.rs +++ b/execution/executor/src/components/chunk_output.rs @@ -24,7 +24,8 @@ use aptos_types::{ }; use aptos_vm::{ sharded_block_executor::{ - sharded_executor_client::ShardedExecutorClient, ShardedBlockExecutor, + local_executor_shard::{LocalExecutorClient, LocalExecutorService}, + ShardedBlockExecutor, }, AptosVM, VMExecutor, }; @@ -33,12 +34,12 @@ use move_core_types::vm_status::StatusCode; use once_cell::sync::Lazy; use std::{ops::Deref, sync::Arc, time::Duration}; -pub static SHARDED_BLOCK_EXECUTOR: Lazy>>> = - Lazy::new(|| { - let executor_clients = - ShardedExecutorClient::create_sharded_executor_clients(AptosVM::get_num_shards(), None); - Arc::new(Mutex::new(ShardedBlockExecutor::new(executor_clients))) - }); +pub static SHARDED_BLOCK_EXECUTOR: Lazy< + Arc>>>, +> = Lazy::new(|| { + let client = LocalExecutorService::setup_local_executor_shards(AptosVM::get_num_shards(), None); + Arc::new(Mutex::new(ShardedBlockExecutor::new(client))) +}); pub struct ChunkOutput { /// Input transactions. diff --git a/execution/executor/src/fuzzing.rs b/execution/executor/src/fuzzing.rs index 4fe95db5b8c54..1cca3dfd88c41 100644 --- a/execution/executor/src/fuzzing.rs +++ b/execution/executor/src/fuzzing.rs @@ -23,7 +23,10 @@ use aptos_types::{ }, vm_status::VMStatus, }; -use aptos_vm::{sharded_block_executor::ShardedBlockExecutor, VMExecutor}; +use aptos_vm::{ + sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, + VMExecutor, +}; use std::sync::Arc; fn create_test_executor() -> BlockExecutor { @@ -69,8 +72,8 @@ impl TransactionBlockExecutor for FakeVM { } impl VMExecutor for FakeVM { - fn execute_block_sharded( - _sharded_block_executor: &ShardedBlockExecutor, + fn execute_block_sharded>( + _sharded_block_executor: &ShardedBlockExecutor, _block: Vec>, _state_view: Arc, _maybe_block_gas_limit: Option, diff --git a/execution/executor/src/mock_vm/mod.rs b/execution/executor/src/mock_vm/mod.rs index 0856f5b69eee2..53e93a867dbcc 100644 --- a/execution/executor/src/mock_vm/mod.rs +++ b/execution/executor/src/mock_vm/mod.rs @@ -31,7 +31,10 @@ use aptos_types::{ vm_status::{StatusCode, VMStatus}, write_set::{WriteOp, WriteSet, WriteSetMut}, }; -use aptos_vm::{sharded_block_executor::ShardedBlockExecutor, VMExecutor}; +use aptos_vm::{ + sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor}, + VMExecutor, +}; use move_core_types::{language_storage::TypeTag, move_resource::MoveResource}; use once_cell::sync::Lazy; use std::{collections::HashMap, sync::Arc}; @@ -207,8 +210,8 @@ impl VMExecutor for MockVM { Ok(outputs) } - fn execute_block_sharded( - _sharded_block_executor: &ShardedBlockExecutor, + fn execute_block_sharded>( + _sharded_block_executor: &ShardedBlockExecutor, _block: Vec>, _state_view: Arc, _maybe_block_gas_limit: Option, diff --git a/secure/net/Cargo.toml b/secure/net/Cargo.toml index 60c7fcc6d056b..5bd47aa7eb868 100644 --- a/secure/net/Cargo.toml +++ b/secure/net/Cargo.toml @@ -15,6 +15,9 @@ rust-version = { workspace = true } [dependencies] aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } +aptos-retrier = { workspace = true } +bcs = { workspace = true } +crossbeam-channel = { workspace = true } once_cell = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } diff --git a/secure/net/src/lib.rs b/secure/net/src/lib.rs index da0f54b594851..ce603a7a2f518 100644 --- a/secure/net/src/lib.rs +++ b/secure/net/src/lib.rs @@ -16,6 +16,8 @@ //! Internally both the client and server leverage a NetworkStream that communications in blocks //! where a block is a length prefixed array of bytes. +pub mod network_controller; + use aptos_logger::{info, trace, warn, Schema}; use aptos_metrics_core::{register_int_counter_vec, IntCounterVec}; use once_cell::sync::Lazy; @@ -29,7 +31,7 @@ use thiserror::Error; #[derive(Schema)] struct SecureNetLogSchema<'a> { - service: &'static str, + service: &'a str, mode: NetworkMode, event: LogEvent, #[schema(debug)] @@ -39,7 +41,7 @@ struct SecureNetLogSchema<'a> { } impl<'a> SecureNetLogSchema<'a> { - fn new(service: &'static str, mode: NetworkMode, event: LogEvent) -> Self { + fn new(service: &'a str, mode: NetworkMode, event: LogEvent) -> Self { Self { service, mode, @@ -122,12 +124,7 @@ impl MethodResult { } } -fn increment_counter( - service: &'static str, - mode: NetworkMode, - method: Method, - result: MethodResult, -) { +fn increment_counter(service: &str, mode: NetworkMode, method: Method, result: MethodResult) { EVENT_COUNTER .with_label_values(&[service, mode.as_str(), method.as_str(), result.as_str()]) .inc() @@ -150,7 +147,7 @@ pub enum Error { } pub struct NetworkClient { - service: &'static str, + service: String, server: SocketAddr, stream: Option, /// Read, Write, Connect timeout in milliseconds. @@ -158,7 +155,7 @@ pub struct NetworkClient { } impl NetworkClient { - pub fn new(service: &'static str, server: SocketAddr, timeout_ms: u64) -> Self { + pub fn new(service: String, server: SocketAddr, timeout_ms: u64) -> Self { Self { service, server, @@ -168,7 +165,7 @@ impl NetworkClient { } fn increment_counter(&self, method: Method, result: MethodResult) { - increment_counter(self.service, NetworkMode::Client, method, result) + increment_counter(&self.service, NetworkMode::Client, method, result) } /// Blocking read until able to successfully read an entire message @@ -179,7 +176,7 @@ impl NetworkClient { if let Err(err) = &result { self.increment_counter(Method::Read, MethodResult::Failure); warn!(SecureNetLogSchema::new( - self.service, + &self.service, NetworkMode::Client, LogEvent::DisconnectedPeerOnRead, ) @@ -196,7 +193,7 @@ impl NetworkClient { /// Shutdown the internal network stream pub fn shutdown(&mut self) -> Result<(), Error> { info!(SecureNetLogSchema::new( - self.service, + &self.service, NetworkMode::Client, LogEvent::Shutdown, )); @@ -214,7 +211,7 @@ impl NetworkClient { if let Err(err) = &result { self.increment_counter(Method::Write, MethodResult::Failure); warn!(SecureNetLogSchema::new( - self.service, + &self.service, NetworkMode::Client, LogEvent::DisconnectedPeerOnWrite, ) @@ -232,7 +229,7 @@ impl NetworkClient { if self.stream.is_none() { self.increment_counter(Method::Connect, MethodResult::Query); info!(SecureNetLogSchema::new( - self.service, + &self.service, NetworkMode::Client, LogEvent::ConnectionAttempt, ) @@ -245,7 +242,7 @@ impl NetworkClient { while let Err(err) = stream { self.increment_counter(Method::Connect, MethodResult::Failure); warn!(SecureNetLogSchema::new( - self.service, + &self.service, NetworkMode::Client, LogEvent::ConnectionFailed, ) @@ -261,7 +258,7 @@ impl NetworkClient { self.stream = Some(NetworkStream::new(stream, self.server, self.timeout_ms)); self.increment_counter(Method::Connect, MethodResult::Success); info!(SecureNetLogSchema::new( - self.service, + &self.service, NetworkMode::Client, LogEvent::ConnectionSuccessful, ) @@ -273,7 +270,7 @@ impl NetworkClient { } pub struct NetworkServer { - service: &'static str, + service: String, listener: Option, stream: Option, /// Read, Write, Connect timeout in milliseconds. @@ -281,7 +278,7 @@ pub struct NetworkServer { } impl NetworkServer { - pub fn new(service: &'static str, listen: SocketAddr, timeout_ms: u64) -> Self { + pub fn new(service: String, listen: SocketAddr, timeout_ms: u64) -> Self { let listener = TcpListener::bind(listen); Self { service, @@ -292,7 +289,7 @@ impl NetworkServer { } fn increment_counter(&self, method: Method, result: MethodResult) { - increment_counter(self.service, NetworkMode::Server, method, result) + increment_counter(&self.service, NetworkMode::Server, method, result) } /// If there isn't already a downstream client, it accepts. Otherwise it @@ -308,7 +305,7 @@ impl NetworkServer { if let Err((remote, err)) = &result { self.increment_counter(Method::Read, MethodResult::Failure); warn!(SecureNetLogSchema::new( - self.service, + &self.service, NetworkMode::Server, LogEvent::DisconnectedPeerOnRead, ) @@ -326,7 +323,7 @@ impl NetworkServer { /// Shutdown the internal network stream pub fn shutdown(&mut self) -> Result<(), Error> { info!(SecureNetLogSchema::new( - self.service, + &self.service, NetworkMode::Server, LogEvent::Shutdown, )); @@ -350,7 +347,7 @@ impl NetworkServer { if let Err((remote, err)) = &result { self.increment_counter(Method::Write, MethodResult::Failure); warn!(SecureNetLogSchema::new( - self.service, + &self.service, NetworkMode::Server, LogEvent::DisconnectedPeerOnWrite, ) @@ -369,7 +366,7 @@ impl NetworkServer { if self.stream.is_none() { self.increment_counter(Method::Connect, MethodResult::Query); info!(SecureNetLogSchema::new( - self.service, + &self.service, NetworkMode::Server, LogEvent::ConnectionAttempt, )); @@ -382,7 +379,7 @@ impl NetworkServer { self.increment_counter(Method::Connect, MethodResult::Failure); let err = err.into(); warn!(SecureNetLogSchema::new( - self.service, + &self.service, NetworkMode::Server, LogEvent::ConnectionSuccessful, ) @@ -393,7 +390,7 @@ impl NetworkServer { self.increment_counter(Method::Connect, MethodResult::Success); info!(SecureNetLogSchema::new( - self.service, + &self.service, NetworkMode::Server, LogEvent::ConnectionSuccessful, ) @@ -528,8 +525,8 @@ mod test { fn test_ping() { let server_port = utils::get_available_port(); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), server_port); - let mut server = NetworkServer::new("test", server_addr, TIMEOUT); - let mut client = NetworkClient::new("test", server_addr, TIMEOUT); + let mut server = NetworkServer::new("test".to_string(), server_addr, TIMEOUT); + let mut client = NetworkClient::new("test".to_string(), server_addr, TIMEOUT); let data = vec![0, 1, 2, 3]; client.write(&data).unwrap(); @@ -546,8 +543,8 @@ mod test { fn test_client_shutdown() { let server_port = utils::get_available_port(); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), server_port); - let mut server = NetworkServer::new("test", server_addr, TIMEOUT); - let mut client = NetworkClient::new("test", server_addr, TIMEOUT); + let mut server = NetworkServer::new("test".to_string(), server_addr, TIMEOUT); + let mut client = NetworkClient::new("test".to_string(), server_addr, TIMEOUT); let data = vec![0, 1, 2, 3]; client.write(&data).unwrap(); @@ -555,7 +552,7 @@ mod test { assert_eq!(data, result); client.shutdown().unwrap(); - let mut client = NetworkClient::new("test", server_addr, TIMEOUT); + let mut client = NetworkClient::new("test".to_string(), server_addr, TIMEOUT); assert!(server.read().is_err()); let data = vec![4, 5, 6, 7]; @@ -568,8 +565,8 @@ mod test { fn test_server_shutdown() { let server_port = utils::get_available_port(); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), server_port); - let mut server = NetworkServer::new("test", server_addr, TIMEOUT); - let mut client = NetworkClient::new("test", server_addr, TIMEOUT); + let mut server = NetworkServer::new("test".to_string(), server_addr, TIMEOUT); + let mut client = NetworkClient::new("test".to_string(), server_addr, TIMEOUT); let data = vec![0, 1, 2, 3]; client.write(&data).unwrap(); @@ -577,7 +574,7 @@ mod test { assert_eq!(data, result); server.shutdown().unwrap(); - let mut server = NetworkServer::new("test", server_addr, TIMEOUT); + let mut server = NetworkServer::new("test".to_string(), server_addr, TIMEOUT); let data = vec![4, 5, 6, 7]; // We aren't notified immediately that a server has shutdown, but it happens eventually @@ -593,8 +590,8 @@ mod test { fn test_write_two_messages_buffered() { let server_port = utils::get_available_port(); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), server_port); - let mut server = NetworkServer::new("test", server_addr, TIMEOUT); - let mut client = NetworkClient::new("test", server_addr, TIMEOUT); + let mut server = NetworkServer::new("test".to_string(), server_addr, TIMEOUT); + let mut client = NetworkClient::new("test".to_string(), server_addr, TIMEOUT); let data1 = vec![0, 1, 2, 3]; let data2 = vec![4, 5, 6, 7]; client.write(&data1).unwrap(); @@ -609,8 +606,8 @@ mod test { fn test_server_timeout() { let server_port = utils::get_available_port(); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), server_port); - let mut server = NetworkServer::new("test", server_addr, TIMEOUT); - let mut client = NetworkClient::new("test", server_addr, TIMEOUT); + let mut server = NetworkServer::new("test".to_string(), server_addr, TIMEOUT); + let mut client = NetworkClient::new("test".to_string(), server_addr, TIMEOUT); let data1 = vec![0, 1, 2, 3]; let data2 = vec![4, 5, 6, 7]; @@ -624,7 +621,7 @@ mod test { // New client, success, note the previous client connection is still active, the server is // actively letting it go due to lack of activity. - let mut client2 = NetworkClient::new("test", server_addr, TIMEOUT); + let mut client2 = NetworkClient::new("test".to_string(), server_addr, TIMEOUT); client2.write(&data2).unwrap(); let result2 = server.read().unwrap(); assert_eq!(data2, result2); @@ -634,8 +631,8 @@ mod test { fn test_client_timeout() { let server_port = utils::get_available_port(); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), server_port); - let mut server = NetworkServer::new("test", server_addr, TIMEOUT); - let mut client = NetworkClient::new("test", server_addr, TIMEOUT); + let mut server = NetworkServer::new("test".to_string(), server_addr, TIMEOUT); + let mut client = NetworkClient::new("test".to_string(), server_addr, TIMEOUT); let data1 = vec![0, 1, 2, 3]; let data2 = vec![4, 5, 6, 7]; @@ -650,7 +647,7 @@ mod test { // Clean up old Server listener but keep the stream online. Start a new server, which will // be the one the client now connects to. server.listener = None; - let mut server2 = NetworkServer::new("test", server_addr, TIMEOUT); + let mut server2 = NetworkServer::new("test".to_string(), server_addr, TIMEOUT); // Client starts a new stream, success client.write(&data2).unwrap(); diff --git a/secure/net/src/network_controller/error.rs b/secure/net/src/network_controller/error.rs new file mode 100644 index 0000000000000..98f5a2c21e864 --- /dev/null +++ b/secure/net/src/network_controller/error.rs @@ -0,0 +1,41 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::network_controller; +use crossbeam_channel::{RecvError, SendError}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +#[derive(Clone, Debug, Deserialize, Error, PartialEq, Eq, Serialize)] +/// Different reasons for executor service fails to execute a block. +pub enum Error { + #[error("Internal error: {0}")] + InternalError(String), + #[error("Serialization error: {0}")] + SerializationError(String), +} + +impl From> for Error { + fn from(error: SendError) -> Self { + Self::InternalError(error.to_string()) + } +} + +impl From for Error { + fn from(error: RecvError) -> Self { + Self::InternalError(error.to_string()) + } +} + +impl From for Error { + fn from(error: bcs::Error) -> Self { + Self::SerializationError(format!("{}", error)) + } +} + +impl From for Error { + fn from(error: crate::Error) -> Self { + Self::InternalError(error.to_string()) + } +} diff --git a/secure/net/src/network_controller/inbound_handler.rs b/secure/net/src/network_controller/inbound_handler.rs new file mode 100644 index 0000000000000..24b461314f948 --- /dev/null +++ b/secure/net/src/network_controller/inbound_handler.rs @@ -0,0 +1,97 @@ +// Copyright © Aptos Foundation + +use crate::{ + network_controller::{error::Error, Message, MessageType, NetworkMessage}, + NetworkServer, +}; +use aptos_logger::error; +use crossbeam_channel::Sender; +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{Arc, Mutex}, + thread, +}; + +pub struct InboundHandler { + service: String, + server: Arc>, + // Used to route incoming messages to correct channel. + inbound_handlers: Arc>>>, +} + +impl InboundHandler { + pub fn new(service: String, listen_addr: SocketAddr, timeout_ms: u64) -> Self { + Self { + service: service.clone(), + server: Arc::new(Mutex::new(NetworkServer::new( + service, + listen_addr, + timeout_ms, + ))), + inbound_handlers: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub fn register_handler(&self, message_type: String, sender: Sender) { + assert!(!self + .inbound_handlers + .lock() + .unwrap() + .contains_key(&MessageType::new(message_type.clone()))); + let mut inbound_handlers = self.inbound_handlers.lock().unwrap(); + inbound_handlers.insert(MessageType::new(message_type), sender); + } + + pub fn start(&mut self) { + let inbound_handlers = self.inbound_handlers.clone(); // Clone the hashmap for the thread + let server_clone = self.server.clone(); // Clone the server to move into the thread + // Spawn a thread to handle incoming messages + let thread_name = format!("{}_network_inbound_handler", self.service); + let builder = thread::Builder::new().name(thread_name); + builder + .spawn(move || { + loop { + // Receive incoming messages from the server + if let Err(e) = + Self::process_one_incoming_message(&server_clone, &inbound_handlers) + { + error!("Error processing incoming messages: {:?}", e); + } + } + }) + .expect("Failed to spawn network_inbound_handler thread"); + } + + // Helper function to short-circuit the network message not to be sent over the network for self messages + pub fn send_incoming_message_to_handler(&self, message_type: &MessageType, message: Message) { + // Check if there is a registered handler for the sender + if let Some(handler) = self.inbound_handlers.lock().unwrap().get(message_type) { + // Send the message to the registered handler + handler.send(message).unwrap(); + } else { + println!("No handler registered for message type: {:?}", message_type); + } + } + + fn process_one_incoming_message( + network_server: &Arc>, + inbound_handlers: &Arc>>>, + ) -> Result<(), Error> { + let message = network_server.lock().unwrap().read()?; + let network_msg: NetworkMessage = bcs::from_bytes(&message)?; + // Get the sender's SocketAddr from the received message + let sender = network_msg.sender; + let msg = network_msg.message; + let message_type = network_msg.message_type; + + // Check if there is a registered handler for the sender + if let Some(handler) = inbound_handlers.lock().unwrap().get(&message_type) { + // Send the message to the registered handler + handler.send(msg)?; + } else { + println!("No handler registered for sender: {:?}", sender); + } + Ok(()) + } +} diff --git a/secure/net/src/network_controller/mod.rs b/secure/net/src/network_controller/mod.rs new file mode 100644 index 0000000000000..7a85174673275 --- /dev/null +++ b/secure/net/src/network_controller/mod.rs @@ -0,0 +1,163 @@ +// Copyright © Aptos Foundation + +use crate::network_controller::{ + inbound_handler::InboundHandler, outbound_handler::OutboundHandler, +}; +use crossbeam_channel::{unbounded, Receiver, Sender}; +use serde::{Deserialize, Serialize}; +use std::{ + net::SocketAddr, + sync::{Arc, Mutex}, +}; + +mod error; +mod inbound_handler; +mod outbound_handler; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[allow(dead_code)] +pub struct NetworkMessage { + pub sender: SocketAddr, + pub message: Message, + pub message_type: MessageType, +} + +#[derive(Clone, Debug, Deserialize, Serialize, Eq, Hash, PartialEq)] +#[allow(dead_code)] +pub struct MessageType { + message_type: String, +} + +impl MessageType { + pub fn new(message_type: String) -> Self { + Self { message_type } + } + + pub fn get_type(&self) -> String { + self.message_type.clone() + } +} + +impl NetworkMessage { + pub fn new(sender: SocketAddr, message: Message, message_type: MessageType) -> Self { + Self { + sender, + message, + message_type, + } + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[allow(dead_code)] +pub struct Message { + pub data: Vec, +} + +impl Message { + pub fn new(data: Vec) -> Self { + Self { data } + } + + pub fn to_bytes(self) -> Vec { + self.data + } +} + +#[allow(dead_code)] +pub struct NetworkController { + inbound_handler: Arc>, + outbound_handler: OutboundHandler, +} + +impl NetworkController { + pub fn new(service: String, listen_addr: SocketAddr, timeout_ms: u64) -> Self { + let inbound_handler = Arc::new(Mutex::new(InboundHandler::new( + service.clone(), + listen_addr, + timeout_ms, + ))); + let outbound_handler = OutboundHandler::new(service, listen_addr, inbound_handler.clone()); + Self { + inbound_handler, + outbound_handler, + } + } + + pub fn create_outbound_channel( + &mut self, + remote_peer_addr: SocketAddr, + message_type: String, + ) -> Sender { + let (outbound_sender, outbound_receiver) = unbounded(); + + self.outbound_handler + .register_handler(message_type, remote_peer_addr, outbound_receiver); + + outbound_sender + } + + pub fn create_inbound_channel(&mut self, message_type: String) -> Receiver { + let (inbound_sender, inbound_receiver) = unbounded(); + + self.inbound_handler + .lock() + .unwrap() + .register_handler(message_type, inbound_sender); + + inbound_receiver + } + + pub fn start(&mut self) { + self.inbound_handler.lock().unwrap().start(); + self.outbound_handler.start(); + } +} + +#[cfg(test)] +mod tests { + use crate::network_controller::{Message, NetworkController}; + use aptos_config::utils; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + #[test] + fn test_basic_send_receive() { + let server_port1 = utils::get_available_port(); + let server_addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), server_port1); + + let server_port2 = utils::get_available_port(); + let server_addr2 = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), server_port2); + + let mut network_controller1 = + NetworkController::new("test1".to_string(), server_addr1, 1000); + let mut network_controller2 = + NetworkController::new("test2".to_string(), server_addr2, 1000); + + let test1_sender = + network_controller2.create_outbound_channel(server_addr1, "test1".to_string()); + let test1_receiver = network_controller1.create_inbound_channel("test1".to_string()); + + let test2_sender = + network_controller1.create_outbound_channel(server_addr2, "test2".to_string()); + let test2_receiver = network_controller2.create_inbound_channel("test2".to_string()); + + network_controller1.start(); + network_controller2.start(); + + let test1_message = "test1".as_bytes().to_vec(); + test1_sender + .send(Message::new(test1_message.clone())) + .unwrap(); + + let test2_message = "test2".as_bytes().to_vec(); + test2_sender + .send(Message::new(test2_message.clone())) + .unwrap(); + + let received_test1_message = test1_receiver.recv().unwrap(); + assert_eq!(received_test1_message.data, test1_message); + + let received_test2_message = test2_receiver.recv().unwrap(); + assert_eq!(received_test2_message.data, test2_message); + } +} diff --git a/secure/net/src/network_controller/outbound_handler.rs b/secure/net/src/network_controller/outbound_handler.rs new file mode 100644 index 0000000000000..dc1e4f44544b4 --- /dev/null +++ b/secure/net/src/network_controller/outbound_handler.rs @@ -0,0 +1,112 @@ +// Copyright © Aptos Foundation + +use crate::{ + network_controller::{inbound_handler::InboundHandler, Message, MessageType, NetworkMessage}, + NetworkClient, +}; +use aptos_retrier::{fixed_retry_strategy, retry}; +use crossbeam_channel::{Receiver, Select}; +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{Arc, Mutex}, + thread, +}; + +pub struct OutboundHandler { + service: String, + network_clients: Arc>>, + address: SocketAddr, + // Used to route outgoing messages to correct network client with the correct message type + handlers: Arc, SocketAddr, MessageType)>>>, + inbound_handler: Arc>, +} + +impl OutboundHandler { + pub fn new( + service: String, + listen_addr: SocketAddr, + inbound_handler: Arc>, + ) -> Self { + Self { + service, + network_clients: Arc::new(Mutex::new(HashMap::new())), + address: listen_addr, + handlers: Arc::new(Mutex::new(Vec::new())), + inbound_handler, + } + } + + pub fn register_handler( + &self, + message_type: String, + remote_addr: SocketAddr, + receiver: Receiver, + ) { + // Create a remote client if it doesn't exist + self.network_clients + .lock() + .unwrap() + .entry(remote_addr) + .or_insert_with(|| NetworkClient::new(message_type.clone(), remote_addr, 5000)); + let mut handlers = self.handlers.lock().unwrap(); + handlers.push((receiver, remote_addr, MessageType::new(message_type))); + } + + pub fn start(&mut self) { + let outbound_handlers = self.handlers.clone(); + let address = self.address; + let network_clients = self.network_clients.clone(); + let thread_name = format!("{}_network_outbound_handler", self.service); + let builder = thread::Builder::new().name(thread_name); + let inbound_handler = self.inbound_handler.clone(); + builder + .spawn(move || loop { + Self::process_one_outgoing_message( + outbound_handlers.clone(), + network_clients.clone(), + &address, + inbound_handler.clone(), + ) + }) + .expect("Failed to spawn outbound handler thread"); + } + + fn process_one_outgoing_message( + outbound_handlers: Arc, SocketAddr, MessageType)>>>, + network_clients: Arc>>, + socket_addr: &SocketAddr, + inbound_handler: Arc>, + ) { + let mut select = Select::new(); + let handlers = outbound_handlers.lock().unwrap(); + + for (receiver, _, _) in handlers.iter() { + select.recv(receiver); + } + let oper = select.select(); + let index = oper.index(); + let msg = oper.recv(&handlers[index].0).unwrap(); + let remote_addr = &handlers[index].1; + let message_type = &handlers[index].2; + if remote_addr == socket_addr { + // If the remote address is the same as the local address, then we are sending a message to ourselves + // so we should just pass it to the inbound handler + inbound_handler + .lock() + .unwrap() + .send_incoming_message_to_handler(message_type, msg); + return; + } + let mut binding = network_clients.lock().unwrap(); + let network_client = binding.get_mut(remote_addr).unwrap(); + let msg = bcs::to_bytes(&NetworkMessage::new( + *socket_addr, + msg, + message_type.clone(), + )) + .unwrap(); + + retry(fixed_retry_strategy(5, 20), || network_client.write(&msg)).unwrap(); + } +} diff --git a/storage/state-view/src/in_memory_state_view.rs b/storage/state-view/src/in_memory_state_view.rs index 519a9b31c01b3..36fe5c2d9d7c3 100644 --- a/storage/state-view/src/in_memory_state_view.rs +++ b/storage/state-view/src/in_memory_state_view.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; // A State view backed by in-memory hashmap. -#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct InMemoryStateView { state_data: HashMap, } diff --git a/types/src/transaction/analyzed_transaction.rs b/types/src/transaction/analyzed_transaction.rs index 0d8a9212070be..30f436d9df66d 100644 --- a/types/src/transaction/analyzed_transaction.rs +++ b/types/src/transaction/analyzed_transaction.rs @@ -52,6 +52,13 @@ impl StorageLocation { _ => panic!("Cannot convert wildcard storage location to state key"), } } + + pub fn state_key(&self) -> &StateKey { + match self { + StorageLocation::Specific(state_key) => state_key, + _ => panic!("Cannot convert wildcard storage location to state key"), + } + } } impl AnalyzedTransaction {