Skip to content

Commit

Permalink
[Execution] Add support for configurable transaction filtering (#10742)…
Browse files Browse the repository at this point in the history
… (#11034)
  • Loading branch information
sitalkedia authored Nov 21, 2023
1 parent 903e482 commit 9156cc7
Show file tree
Hide file tree
Showing 10 changed files with 492 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions config/src/config/execution_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// SPDX-License-Identifier: Apache-2.0

use crate::config::{
config_sanitizer::ConfigSanitizer, node_config_loader::NodeType, utils::RootPath, Error,
NodeConfig,
config_sanitizer::ConfigSanitizer, node_config_loader::NodeType,
transaction_filter_type::Filter, utils::RootPath, Error, NodeConfig,
};
use aptos_types::{chain_id::ChainId, transaction::Transaction};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -34,6 +34,8 @@ pub struct ExecutionConfig {
pub paranoid_hot_potato_verification: bool,
/// Enables enhanced metrics around processed transactions
pub processed_transactions_detailed_counters: bool,
/// Enables filtering of transactions before they are sent to execution
pub transaction_filter: Filter,
}

impl std::fmt::Debug for ExecutionConfig {
Expand Down Expand Up @@ -63,6 +65,7 @@ impl Default for ExecutionConfig {
paranoid_type_verification: true,
paranoid_hot_potato_verification: true,
processed_transactions_detailed_counters: false,
transaction_filter: Filter::empty(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions config/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod safety_rules_config;
mod secure_backend_config;
mod state_sync_config;
mod storage_config;
pub mod transaction_filter_type;
mod utils;

// All public usage statements should be declared below
Expand Down
193 changes: 193 additions & 0 deletions config/src/config/transaction_filter_type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_crypto::HashValue;
use aptos_types::{
account_address::AccountAddress,
transaction::{SignedTransaction, TransactionPayload},
};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
enum Matcher {
All,
BlockId(HashValue),
TransactionId(HashValue),
Sender(AccountAddress),
ModuleAddress(AccountAddress),
EntryFunction(AccountAddress, String, String),
}

impl Matcher {
fn matches(&self, block_id: HashValue, txn: &SignedTransaction) -> bool {
match self {
Matcher::All => true,
Matcher::BlockId(id) => block_id == *id,
Matcher::TransactionId(id) => txn.clone().committed_hash() == *id,
Matcher::Sender(sender) => txn.sender() == *sender,
Matcher::ModuleAddress(address) => match txn.payload() {
TransactionPayload::EntryFunction(entry_function) => {
*entry_function.module().address() == *address
},
_ => false,
},
Matcher::EntryFunction(address, module_name, function) => match txn.payload() {
TransactionPayload::EntryFunction(entry_function) => {
*entry_function.module().address() == *address
&& entry_function.module().name().to_string() == *module_name
&& entry_function.function().to_string() == *function
},
_ => false,
},
}
}
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
enum Rule {
Allow(Matcher),
Deny(Matcher),
}

enum EvalResult {
Allow,
Deny,
NoMatch,
}

impl Rule {
fn eval(&self, block_id: HashValue, txn: &SignedTransaction) -> EvalResult {
match self {
Rule::Allow(matcher) => {
if matcher.matches(block_id, txn) {
EvalResult::Allow
} else {
EvalResult::NoMatch
}
},
Rule::Deny(matcher) => {
if matcher.matches(block_id, txn) {
EvalResult::Deny
} else {
EvalResult::NoMatch
}
},
}
}
}

/// A filter that can be used to allow or deny transactions from being executed. It contains a set
/// of rules that are evaluated one by one in the order of declaration.
/// If a rule matches, the transaction is either allowed or
/// denied depending on the rule. If no rule matches, the transaction is allowed.
/// For example a rules might look like this:
/// rules:
/// - Allow:
/// Sender: f8871acf2c827d40e23b71f6ff2b9accef8dbb17709b88bd9eb95e6bb748c25a
/// - Allow:
/// ModuleAddress: "0000000000000000000000000000000000000000000000000000000000000001"
/// - Allow:
/// EntryFunction:
/// - "0000000000000000000000000000000000000000000000000000000000000001"
/// - test
/// - check
/// - Allow:
/// EntryFunction:
/// - "0000000000000000000000000000000000000000000000000000000000000001"
/// - test
/// - new
/// - Deny: All
/// This filter allows transactions from the sender with address f8871acf2c827d40e23b71f6ff2b9accef8dbb17709b88bd9eb95e6bb748c25a or
/// from the module with address 0000000000000000000000000000000000000000000000000000000000000001 or entry functions
/// test::check and test::new from the module 0000000000000000000000000000000000000000000000000000000000000001. All other transactions are denied.
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct Filter {
rules: Vec<Rule>,
}

impl Filter {
pub fn empty() -> Self {
Self { rules: vec![] }
}

pub fn is_empty(&self) -> bool {
self.rules.is_empty()
}

pub fn add_deny_all(mut self) -> Self {
self.rules.push(Rule::Deny(Matcher::All));
self
}

pub fn add_deny_block_id(mut self, block_id: HashValue) -> Self {
self.rules.push(Rule::Deny(Matcher::BlockId(block_id)));
self
}

pub fn add_deny_transaction_id(mut self, txn_id: HashValue) -> Self {
self.rules.push(Rule::Deny(Matcher::TransactionId(txn_id)));
self
}

pub fn add_allow_sender(mut self, sender: AccountAddress) -> Self {
self.rules.push(Rule::Allow(Matcher::Sender(sender)));
self
}

pub fn add_deny_sender(mut self, sender: AccountAddress) -> Self {
self.rules.push(Rule::Deny(Matcher::Sender(sender)));
self
}

pub fn add_allow_module_address(mut self, address: AccountAddress) -> Self {
self.rules
.push(Rule::Allow(Matcher::ModuleAddress(address)));
self
}

pub fn add_deny_module_address(mut self, address: AccountAddress) -> Self {
self.rules.push(Rule::Deny(Matcher::ModuleAddress(address)));
self
}

pub fn add_deny_entry_function(
mut self,
address: AccountAddress,
module_name: String,
function: String,
) -> Self {
self.rules.push(Rule::Deny(Matcher::EntryFunction(
address,
module_name,
function,
)));
self
}

pub fn add_allow_entry_function(
mut self,
address: AccountAddress,
module_name: String,
function: String,
) -> Self {
self.rules.push(Rule::Allow(Matcher::EntryFunction(
address,
module_name,
function,
)));
self
}

pub fn allows(&self, block_id: HashValue, txn: &SignedTransaction) -> bool {
for rule in &self.rules {
// Rules are evaluated in the order and the first rule that matches is used. If no rule
// matches, the transaction is allowed.
match rule.eval(block_id, txn) {
EvalResult::Allow => return true,
EvalResult::Deny => return false,
EvalResult::NoMatch => continue,
}
}
true
}
}
2 changes: 2 additions & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ rayon = { workspace = true }
serde = { workspace = true }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-metrics = { workspace = true }
tokio-retry = { workspace = true }

[dev-dependencies]
aptos = { workspace = true }
aptos-cached-packages = { workspace = true }
aptos-config = { workspace = true, features = ["fuzzing"] }
aptos-consensus-types = { workspace = true, features = ["fuzzing"] }
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/consensus_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
persistent_liveness_storage::StorageWriteProxy,
quorum_store::quorum_store_db::QuorumStoreDB,
state_computer::ExecutionProxy,
transaction_filter::TransactionFilter,
txn_notifier::MempoolNotifier,
util::time_service::ClockTimeService,
};
Expand Down Expand Up @@ -51,6 +52,7 @@ pub fn start_consensus(
txn_notifier,
state_sync_notifier,
runtime.handle(),
TransactionFilter::new(node_config.execution.transaction_filter.clone()),
));

let time_service = Arc::new(ClockTimeService::new(runtime.handle().clone()));
Expand Down
5 changes: 4 additions & 1 deletion consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::{
use anyhow::{bail, ensure, Context};
use aptos_bounded_executor::BoundedExecutor;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::config::{ConsensusConfig, NodeConfig};
use aptos_config::config::{ConsensusConfig, ExecutionConfig, NodeConfig};
use aptos_consensus_types::{
common::{Author, Round},
epoch_retrieval::EpochRetrievalRequest,
Expand Down Expand Up @@ -111,6 +111,7 @@ pub enum LivenessStorageData {
pub struct EpochManager<P: OnChainConfigProvider> {
author: Author,
config: ConsensusConfig,
execution_config: ExecutionConfig,
time_service: Arc<dyn TimeService>,
self_sender: aptos_channels::Sender<Event<ConsensusMsg>>,
network_sender: ConsensusNetworkClient<NetworkClient<ConsensusMsg>>,
Expand Down Expand Up @@ -158,11 +159,13 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
) -> Self {
let author = node_config.validator_network.as_ref().unwrap().peer_id();
let config = node_config.consensus.clone();
let execution_config = node_config.execution.clone();
let sr_config = &node_config.consensus.safety_rules;
let safety_rules_manager = SafetyRulesManager::new(sr_config);
Self {
author,
config,
execution_config,
time_service,
self_sender,
network_sender,
Expand Down
1 change: 1 addition & 0 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub mod network_interface;
mod payload_manager;
mod sender_aware_shuffler;
mod transaction_deduper;
mod transaction_filter;
mod transaction_shuffler;
mod txn_hash_and_authenticator_deduper;

Expand Down
12 changes: 10 additions & 2 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
payload_manager::PayloadManager,
state_replication::{StateComputer, StateComputerCommitCallBackType},
transaction_deduper::TransactionDeduper,
transaction_filter::TransactionFilter,
transaction_shuffler::TransactionShuffler,
txn_notifier::TxnNotifier,
};
Expand Down Expand Up @@ -60,6 +61,7 @@ pub struct ExecutionProxy {
transaction_shuffler: Mutex<Option<Arc<dyn TransactionShuffler>>>,
maybe_block_gas_limit: Mutex<Option<u64>>,
transaction_deduper: Mutex<Option<Arc<dyn TransactionDeduper>>>,
transaction_filter: TransactionFilter,
}

impl ExecutionProxy {
Expand All @@ -68,6 +70,7 @@ impl ExecutionProxy {
txn_notifier: Arc<dyn TxnNotifier>,
state_sync_notifier: Arc<dyn ConsensusNotificationSender>,
handle: &tokio::runtime::Handle,
txn_filter: TransactionFilter,
) -> Self {
let (tx, mut rx) =
aptos_channels::new::<NotificationType>(10, &counters::PENDING_STATE_SYNC_NOTIFICATION);
Expand Down Expand Up @@ -95,6 +98,7 @@ impl ExecutionProxy {
transaction_shuffler: Mutex::new(None),
maybe_block_gas_limit: Mutex::new(None),
transaction_deduper: Mutex::new(None),
transaction_filter: txn_filter,
}
}
}
Expand Down Expand Up @@ -126,7 +130,8 @@ impl StateComputer for ExecutionProxy {
let txn_shuffler = self.transaction_shuffler.lock().as_ref().unwrap().clone();
let txns = payload_manager.get_transactions(block).await?;

let deduped_txns = txn_deduper.dedup(txns);
let filtered_txns = self.transaction_filter.filter(block_id, txns);
let deduped_txns = txn_deduper.dedup(filtered_txns);
let shuffled_txns = txn_shuffler.shuffle(deduped_txns);

let block_gas_limit = *self.maybe_block_gas_limit.lock();
Expand Down Expand Up @@ -201,7 +206,8 @@ impl StateComputer for ExecutionProxy {
}

let signed_txns = payload_manager.get_transactions(block.block()).await?;
let deduped_txns = txn_deduper.dedup(signed_txns);
let filtered_txns = self.transaction_filter.filter(block.id(), signed_txns);
let deduped_txns = txn_deduper.dedup(filtered_txns);
let shuffled_txns = txn_shuffler.shuffle(deduped_txns);

txns.extend(block.transactions_to_commit(
Expand Down Expand Up @@ -330,6 +336,7 @@ async fn test_commit_sync_race() {
error::MempoolError, transaction_deduper::create_transaction_deduper,
transaction_shuffler::create_transaction_shuffler,
};
use aptos_config::config::transaction_filter_type::Filter;
use aptos_consensus_notifications::Error;
use aptos_executor_types::state_checkpoint_output::StateCheckpointOutput;
use aptos_types::{
Expand Down Expand Up @@ -452,6 +459,7 @@ async fn test_commit_sync_race() {
recorded_commit.clone(),
recorded_commit.clone(),
&tokio::runtime::Handle::current(),
TransactionFilter::new(Filter::empty()),
);
executor.new_epoch(
&EpochState::empty(),
Expand Down
Loading

0 comments on commit 9156cc7

Please sign in to comment.