Skip to content

Commit

Permalink
[Sharding] Refactor sharded block executor for better remote client s…
Browse files Browse the repository at this point in the history
…upport (#9221)
  • Loading branch information
sitalkedia authored Jul 22, 2023
1 parent 8a22309 commit cf1accf
Show file tree
Hide file tree
Showing 39 changed files with 1,729 additions and 1,066 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 11 additions & 16 deletions aptos-move/aptos-transaction-benchmarks/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use aptos_bitvec::BitVec;
use aptos_block_executor::txn_commit_hook::NoOpTransactionCommitHook;
use aptos_block_partitioner::sharded_block_partitioner::ShardedBlockPartitioner;
use aptos_crypto::HashValue;
use aptos_executor_service::remote_executor_client::RemoteExecutorClient;
use aptos_language_e2e_tests::{
account_universe::{AUTransactionGen, AccountPickStyle, AccountUniverse, AccountUniverseGen},
data_store::FakeDataStore,
Expand All @@ -22,7 +21,10 @@ use aptos_types::{
use aptos_vm::{
block_executor::{AptosTransactionOutput, BlockAptosVM},
data_cache::AsMoveResolver,
sharded_block_executor::{block_executor_client::VMExecutorClient, ShardedBlockExecutor},
sharded_block_executor::{
local_executor_shard::{LocalExecutorClient, LocalExecutorService},
ShardedBlockExecutor,
},
};
use criterion::{measurement::Measurement, BatchSize, Bencher};
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -192,7 +194,8 @@ struct TransactionBenchState<S> {
num_transactions: usize,
strategy: S,
account_universe: AccountUniverse,
parallel_block_executor: Option<Arc<ShardedBlockExecutor<FakeDataStore>>>,
parallel_block_executor:
Option<Arc<ShardedBlockExecutor<FakeDataStore, LocalExecutorClient<FakeDataStore>>>>,
block_partitioner: Option<ShardedBlockPartitioner>,
validator_set: ValidatorSet,
state_view: Arc<FakeDataStore>,
Expand Down Expand Up @@ -228,7 +231,8 @@ where
universe_strategy: impl Strategy<Value = AccountUniverseGen>,
num_transactions: usize,
num_executor_shards: usize,
remote_executor_addresses: Option<Vec<SocketAddr>>,
// TODO(skedia): add support for remote executor addresses.
_remote_executor_addresses: Option<Vec<SocketAddr>>,
) -> Self {
let mut runner = TestRunner::default();
let universe_gen = universe_strategy
Expand All @@ -246,18 +250,9 @@ where
let (parallel_block_executor, block_partitioner) = if num_executor_shards == 1 {
(None, None)
} else {
let parallel_block_executor =
if let Some(remote_executor_addresses) = remote_executor_addresses {
let remote_executor_clients = remote_executor_addresses
.into_iter()
.map(|addr| RemoteExecutorClient::new(addr, 10000))
.collect::<Vec<RemoteExecutorClient>>();
Arc::new(ShardedBlockExecutor::new(remote_executor_clients))
} else {
let local_executor_client =
VMExecutorClient::create_vm_clients(num_executor_shards, None);
Arc::new(ShardedBlockExecutor::new(local_executor_client))
};
let client =
LocalExecutorService::setup_local_executor_shards(num_executor_shards, None);
let parallel_block_executor = Arc::new(ShardedBlockExecutor::new(client));
(
Some(parallel_block_executor),
Some(ShardedBlockPartitioner::new(num_executor_shards)),
Expand Down
1 change: 1 addition & 0 deletions aptos-move/aptos-vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ aptos-utils = { workspace = true }
aptos-vm-logging = { workspace = true }
aptos-vm-types = { workspace = true }
bcs = { workspace = true }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true }
fail = { workspace = true }
futures = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions aptos-move/aptos-vm/src/aptos_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
data_cache::StorageAdapter,
errors::expect_only_successful_execution,
move_vm_ext::{MoveResolverExt, RespawnedSession, SessionExt, SessionId},
sharded_block_executor::ShardedBlockExecutor,
sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor},
system_module_names::*,
transaction_metadata::TransactionMetadata,
verifier, VMExecutor, VMValidator,
Expand Down Expand Up @@ -1527,8 +1527,8 @@ impl VMExecutor for AptosVM {
ret
}

fn execute_block_sharded<S: StateView + Sync + Send + 'static>(
sharded_block_executor: &ShardedBlockExecutor<S>,
fn execute_block_sharded<S: StateView + Sync + Send + 'static, C: ExecutorClient<S>>(
sharded_block_executor: &ShardedBlockExecutor<S, C>,
transactions: Vec<SubBlocksForShard<AnalyzedTransaction>>,
state_view: Arc<S>,
maybe_block_gas_limit: Option<u64>,
Expand Down
6 changes: 3 additions & 3 deletions aptos-move/aptos-vm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ mod transaction_validation;
mod verifier;

pub use crate::aptos_vm::AptosVM;
use crate::sharded_block_executor::ShardedBlockExecutor;
use crate::sharded_block_executor::{executor_client::ExecutorClient, ShardedBlockExecutor};
use aptos_state_view::StateView;
use aptos_types::{
block_executor::partitioner::SubBlocksForShard,
Expand Down Expand Up @@ -161,8 +161,8 @@ pub trait VMExecutor: Send + Sync {
) -> Result<Vec<TransactionOutput>, VMStatus>;

/// Executes a block of transactions using a sharded block executor and returns the results.
fn execute_block_sharded<S: StateView + Sync + Send + 'static>(
sharded_block_executor: &ShardedBlockExecutor<S>,
fn execute_block_sharded<S: StateView + Sync + Send + 'static, E: ExecutorClient<S>>(
sharded_block_executor: &ShardedBlockExecutor<S, E>,
block: Vec<SubBlocksForShard<AnalyzedTransaction>>,
state_view: Arc<S>,
maybe_block_gas_limit: Option<u64>,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::sharded_block_executor::ExecutorShardCommand;
use aptos_state_view::StateView;
use aptos_types::transaction::TransactionOutput;
use move_core_types::vm_status::VMStatus;

// Interface to communicate from the executor shards to the block executor coordinator.
pub trait CoordinatorClient<S: StateView + Sync + Send + 'static>: Send + Sync {
fn receive_execute_command(&self) -> ExecutorShardCommand<S>;

fn send_execution_result(&self, result: Result<Vec<Vec<TransactionOutput>>, VMStatus>);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,27 @@ use aptos_types::{
};
use std::{
collections::{HashMap, HashSet},
sync::{
mpsc::{Receiver, Sender},
Arc, Mutex,
},
sync::Arc,
};

pub struct CrossShardCommitReceiver {}

impl CrossShardCommitReceiver {
pub fn start<S: StateView + Sync + Send>(
cross_shard_state_view: Arc<CrossShardStateView<S>>,
message_rx: &Receiver<CrossShardMsg>,
cross_shard_client: Arc<dyn CrossShardClient>,
round: RoundId,
) {
loop {
let msg = message_rx.recv().unwrap();
let msg = cross_shard_client.receive_cross_shard_msg(round);
match msg {
CrossShardMsg::RemoteTxnWriteMsg(txn_commit_msg) => {
RemoteTxnWriteMsg(txn_commit_msg) => {
let (state_key, write_op) = txn_commit_msg.take();
cross_shard_state_view
.set_value(&state_key, write_op.and_then(|w| w.as_state_value()));
},
CrossShardMsg::StopMsg => {
trace!("Cross shard commit receiver stopped for round {}", round);
break;
},
}
Expand All @@ -50,8 +49,7 @@ impl CrossShardCommitReceiver {

pub struct CrossShardCommitSender {
shard_id: ShardId,
// The senders of cross-shard messages to other shards per round.
message_txs: Arc<Vec<Vec<Mutex<Sender<CrossShardMsg>>>>>,
cross_shard_client: Arc<dyn CrossShardClient>,
// The hashmap of source txn index to hashmap of conflicting storage location to the
// list shard id and round id. Please note that the transaction indices stored here is
// global indices, so we need to convert the local index received from the parallel execution to
Expand All @@ -65,7 +63,7 @@ pub struct CrossShardCommitSender {
impl CrossShardCommitSender {
pub fn new(
shard_id: ShardId,
message_txs: Arc<Vec<Vec<Mutex<Sender<CrossShardMsg>>>>>,
cross_shard_client: Arc<dyn CrossShardClient>,
sub_block: &SubBlock<AnalyzedTransaction>,
) -> Self {
let mut dependent_edges = HashMap::new();
Expand Down Expand Up @@ -98,7 +96,7 @@ impl CrossShardCommitSender {

Self {
shard_id,
message_txs,
cross_shard_client,
dependent_edges,
index_offset: sub_block.start_index as TxnIndex,
}
Expand All @@ -121,11 +119,11 @@ impl CrossShardCommitSender {
state_key.clone(),
Some(write_op.clone()),
));
self.message_txs[*dependent_shard_id][*round_id]
.lock()
.unwrap()
.send(message)
.unwrap();
self.cross_shard_client.send_cross_shard_msg(
*dependent_shard_id,
*round_id,
message,
);
}
}
}
Expand All @@ -146,3 +144,11 @@ impl TransactionCommitHook for CrossShardCommitSender {
todo!("on_transaction_aborted not supported for sharded execution yet")
}
}

// CrossShardClient is a trait that defines the interface for sending and receiving messages across
// shards.
pub trait CrossShardClient: Send + Sync {
fn send_cross_shard_msg(&self, shard_id: ShardId, round: RoundId, msg: CrossShardMsg);

fn receive_cross_shard_msg(&self, current_round: RoundId) -> CrossShardMsg;
}
28 changes: 28 additions & 0 deletions aptos-move/aptos-vm/src/sharded_block_executor/executor_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_state_view::StateView;
use aptos_types::{
block_executor::partitioner::SubBlocksForShard,
transaction::{analyzed_transaction::AnalyzedTransaction, TransactionOutput},
};
use move_core_types::vm_status::VMStatus;
use std::sync::Arc;

// Interface to communicate from the block executor coordinator to the executor shards.
pub trait ExecutorClient<S: StateView + Sync + Send + 'static>: Send + Sync {
fn num_shards(&self) -> usize;

// A non blocking call that sends the block to be executed by the executor shards.
fn execute_block(
&self,
state_view: Arc<S>,
block: Vec<SubBlocksForShard<AnalyzedTransaction>>,
concurrency_level_per_shard: usize,
maybe_block_gas_limit: Option<u64>,
);

// Blocking call that waits for the execution results from the executor shards. It returns the execution results
// from each shard and in the sub-block order.
fn get_execution_result(&self) -> Result<Vec<Vec<Vec<TransactionOutput>>>, VMStatus>;
}
Loading

0 comments on commit cf1accf

Please sign in to comment.