diff --git a/scripts/tests/api_compare/filter-list b/scripts/tests/api_compare/filter-list index d1715fbca395..86a63a4f6529 100644 --- a/scripts/tests/api_compare/filter-list +++ b/scripts/tests/api_compare/filter-list @@ -6,3 +6,5 @@ !Filecoin.StateReplay # TODO(elmattic): https://github.com/ChainSafe/forest/issues/4759 !Filecoin.EthGetTransactionReceipt +# TODO(elmattic): https://github.com/ChainSafe/forest/issues/4851 +!Filecoin.EthGetLogs diff --git a/scripts/tests/api_compare/filter-list-offline b/scripts/tests/api_compare/filter-list-offline index 05a7e99320f9..78dff6ad7d34 100644 --- a/scripts/tests/api_compare/filter-list-offline +++ b/scripts/tests/api_compare/filter-list-offline @@ -27,3 +27,5 @@ !Filecoin.ChainSetHead # TODO(elmattic): https://github.com/ChainSafe/forest/issues/4759 !Filecoin.EthGetTransactionReceipt +# TODO(elmattic): https://github.com/ChainSafe/forest/issues/4851 +!Filecoin.EthGetLogs diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 362a6a6bed87..f75daea6637e 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -5,8 +5,7 @@ use std::sync::Arc; use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey, TxMeta}; use crate::fil_cns; -use crate::interpreter::BlockMessages; -use crate::interpreter::VMTrace; +use crate::interpreter::{BlockMessages, VMEvent, VMTrace}; use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; use crate::message::{ChainMessage, Message as MessageTrait, SignedMessage}; use crate::networks::{ChainConfig, Height}; @@ -16,6 +15,7 @@ use crate::shim::{ address::Address, econ::TokenAmount, executor::Receipt, message::Message, state_tree::StateTree, version::NetworkVersion, }; +use crate::state_manager::StateOutput; use crate::utils::db::{BlockstoreExt, CborStoreExt}; use ahash::{HashMap, HashMapExt, HashSet}; use anyhow::Context; @@ -340,7 +340,7 @@ where // state-root without caching. let genesis_timestamp = heaviest_tipset.genesis(&chain_index.db)?.timestamp; let beacon = Arc::new(chain_config.get_beacon_schedule(genesis_timestamp)); - let (state, _) = crate::state_manager::apply_block_messages( + let StateOutput { state_root, .. } = crate::state_manager::apply_block_messages( genesis_timestamp, Arc::clone(&chain_index), Arc::clone(&chain_config), @@ -353,9 +353,10 @@ where Arc::clone(&heaviest_tipset), crate::state_manager::NO_CALLBACK, VMTrace::NotTraced, + VMEvent::NotPushed, ) .map_err(|e| Error::Other(e.to_string()))?; - return Ok((heaviest_tipset, state)); + return Ok((heaviest_tipset, state_root)); } let next_ts = chain_index diff --git a/src/cli_shared/cli/config.rs b/src/cli_shared/cli/config.rs index c1c87b989d84..7b15305934b9 100644 --- a/src/cli_shared/cli/config.rs +++ b/src/cli_shared/cli/config.rs @@ -52,6 +52,10 @@ impl Config { pub fn db_config(&self) -> &DbConfig { &self.parity_db } + + pub fn chain(&self) -> &NetworkChain { + &self.chain + } } #[cfg(test)] diff --git a/src/cli_shared/mod.rs b/src/cli_shared/mod.rs index 0e2a65b4f9ae..6d5f207acd74 100644 --- a/src/cli_shared/mod.rs +++ b/src/cli_shared/mod.rs @@ -18,7 +18,7 @@ pub use tikv_jemallocator; /// Gets chain data directory pub fn chain_path(config: &Config) -> PathBuf { - PathBuf::from(&config.client.data_dir).join(config.chain.to_string()) + PathBuf::from(&config.client.data_dir).join(config.chain().to_string()) } /// Gets car db path @@ -57,7 +57,7 @@ mod tests { let (config_path, config) = read_config(None, None).unwrap(); assert!(config_path.is_none()); - assert_eq!(config.chain, NetworkChain::Mainnet); + assert_eq!(config.chain(), &NetworkChain::Mainnet); } #[test] @@ -65,7 +65,7 @@ mod tests { let (config_path, config) = read_config(None, Some(NetworkChain::Calibnet)).unwrap(); assert!(config_path.is_none()); - assert_eq!(config.chain, NetworkChain::Calibnet); + assert_eq!(config.chain(), &NetworkChain::Calibnet); } #[test] @@ -73,7 +73,7 @@ mod tests { let (config_path, config) = read_config(None, Some(NetworkChain::Butterflynet)).unwrap(); assert!(config_path.is_none()); - assert_eq!(config.chain, NetworkChain::Butterflynet); + assert_eq!(config.chain(), &NetworkChain::Butterflynet); } #[test] @@ -86,7 +86,7 @@ mod tests { let (config_path, config) = read_config(Some(&path), None).unwrap(); assert_eq!(config_path.unwrap(), ConfigPath::Cli(path)); - assert_eq!(config.chain, NetworkChain::Mainnet); + assert_eq!(config.chain(), &NetworkChain::Mainnet); assert_eq!(config, default_config); } } diff --git a/src/daemon/main.rs b/src/daemon/main.rs index dacf6d751dcd..eee20ce399fa 100644 --- a/src/daemon/main.rs +++ b/src/daemon/main.rs @@ -101,7 +101,7 @@ where } check_for_unknown_keys(path.to_path_buf(), &cfg); } else { - info!("Using default {} config", cfg.chain); + info!("Using default {} config", cfg.chain()); } if opts.dry_run { return Ok(()); diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index db869f034a2c..94b167b2c9f5 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -147,7 +147,7 @@ pub(super) async fn start( config: Config, shutdown_send: mpsc::Sender<()>, ) -> anyhow::Result<()> { - let chain_config = Arc::new(ChainConfig::from_chain(&config.chain)); + let chain_config = Arc::new(ChainConfig::from_chain(config.chain())); if chain_config.is_testnet() { CurrentNetwork::set_global(Network::Testnet); } @@ -189,7 +189,7 @@ pub(super) async fn start( load_all_forest_cars(&db, &forest_car_db_dir)?; if config.client.load_actors && !opts.stateless { - load_actor_bundles(&db, &config.chain).await?; + load_actor_bundles(&db, config.chain()).await?; } let mut services = JoinSet::new(); @@ -296,7 +296,7 @@ pub(super) async fn start( let network_name = get_network_name_from_genesis(&genesis_header, &state_manager)?; info!("Using network :: {}", get_actual_chain_name(&network_name)); - utils::misc::display_chain_logo(&config.chain); + utils::misc::display_chain_logo(config.chain()); let (tipset_sender, tipset_receiver) = flume::bounded(20); // if bootstrap peers are not set, set them @@ -424,7 +424,10 @@ pub(super) async fn start( )) .expect("F3 lease manager should not have been initialized before"); let chain_config = chain_config.clone(); - let default_f3_root = config.client.data_dir.join(format!("f3/{}", config.chain)); + let default_f3_root = config + .client + .data_dir + .join(format!("f3/{}", config.chain())); let crate::f3::F3Options { chain_finality, bootstrap_epoch, @@ -553,7 +556,7 @@ async fn set_snapshot_path_if_needed( } let vendor = snapshot::TrustedVendor::default(); - let chain = &config.chain; + let chain = config.chain(); // What height is our chain at right now, and what network version does that correspond to? let network_version = chain_config.network_version(epoch); diff --git a/src/db/migration/v0_19_0.rs b/src/db/migration/v0_19_0.rs index a6012bdebe2e..7f747312c340 100644 --- a/src/db/migration/v0_19_0.rs +++ b/src/db/migration/v0_19_0.rs @@ -154,7 +154,7 @@ async fn create_state_manager_and_populate(config: Config, db_name: String) -> a let forest_car_db_dir = db_root_dir.join(CAR_DB_DIR_NAME); load_all_forest_cars(&db, &forest_car_db_dir)?; - let chain_config = Arc::new(ChainConfig::from_chain(&config.chain)); + let chain_config = Arc::new(ChainConfig::from_chain(config.chain())); let genesis_header = read_genesis_header( config.client.genesis_file.as_deref(), @@ -173,7 +173,7 @@ async fn create_state_manager_and_populate(config: Config, db_name: String) -> a let state_manager = StateManager::new( Arc::clone(&chain_store), - Arc::clone(&chain_config), + chain_config, Arc::new(config.sync.clone()), )?; diff --git a/src/interpreter/vm.rs b/src/interpreter/vm.rs index 67c604449c1d..00a21abbc423 100644 --- a/src/interpreter/vm.rs +++ b/src/interpreter/vm.rs @@ -17,7 +17,7 @@ use crate::networks::{ChainConfig, NetworkChain}; use crate::shim::{ address::Address, econ::TokenAmount, - executor::{ApplyRet, Receipt}, + executor::{ApplyRet, Receipt, StampedEvent}, externs::{Rand, RandWrapper}, machine::MultiEngine, message::{Message, Message_v3}, @@ -76,6 +76,8 @@ type ForestExecutorV4 = DefaultExecutor_v4>; pub type ApplyResult = anyhow::Result<(ApplyRet, Duration)>; +pub type ApplyBlockResult = anyhow::Result<(Vec, Vec>), anyhow::Error>; + /// Comes from pub const IMPLICIT_MESSAGE_GAS_LIMIT: i64 = i64::MAX / 2; @@ -351,8 +353,10 @@ where messages: &[BlockMessages], epoch: ChainEpoch, mut callback: Option) -> anyhow::Result<()>>, - ) -> Result, anyhow::Error> { + enable_event_pushing: VMEvent, + ) -> ApplyBlockResult { let mut receipts = Vec::new(); + let mut events = Vec::new(); let mut processed = HashSet::::default(); for block in messages.iter() { @@ -383,6 +387,10 @@ where let msg_receipt = ret.msg_receipt(); receipts.push(msg_receipt.clone()); + if enable_event_pushing.is_pushed() { + events.push(ret.events()); + } + // Add processed Cid to set of processed messages processed.insert(cid); Ok(()) @@ -428,7 +436,7 @@ where tracing::error!("End of epoch cron failed to run: {}", e); } - Ok(receipts) + Ok((receipts, events)) } /// Applies single message through VM and returns result from execution. @@ -590,3 +598,20 @@ impl VMTrace { matches!(self, VMTrace::Traced) } } + +/// This controls whether we should push or not events when applying block messages. +#[derive(Default, Clone, Copy)] +pub enum VMEvent { + /// Push event during [`VM::apply_block_messages`] + Pushed, + /// Do not push event + #[default] + NotPushed, +} + +impl VMEvent { + /// Should event be pushed? + pub fn is_pushed(&self) -> bool { + matches!(self, VMEvent::Pushed) + } +} diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 73a9eab111c5..168f034fbe40 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -9,7 +9,7 @@ use self::eth_tx::*; use self::filter::hex_str_to_epoch; use self::types::*; use super::gas; -use crate::blocks::Tipset; +use crate::blocks::{Tipset, TipsetKey}; use crate::chain::{index::ResolveNullTipset, ChainStore}; use crate::chain_sync::SyncStage; use crate::cid_collections::CidHashSet; @@ -22,7 +22,7 @@ use crate::interpreter::VMTrace; use crate::lotus_json::{lotus_json_with_self, HasLotusJson}; use crate::message::{ChainMessage, Message as _, SignedMessage}; use crate::rpc::error::ServerError; -use crate::rpc::types::{ApiTipsetKey, MessageLookup}; +use crate::rpc::types::{ApiTipsetKey, EventEntry, MessageLookup}; use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod}; use crate::shim::actors::eam; use crate::shim::actors::is_evm_actor; @@ -217,8 +217,6 @@ impl From<[u8; EVM_WORD_LENGTH]> for EthHash { } } -lotus_json_with_self!(EthHash); - #[derive(PartialEq, Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] #[serde(rename_all = "camelCase")] pub enum Predefined { @@ -519,14 +517,25 @@ impl EthTxReceipt { #[derive(PartialEq, Debug, Default, Clone, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct EthLog { + /// The address of the actor that produced the event log. address: EthAddress, + /// The value of the event log, excluding topics. data: EthBytes, + /// List of topics associated with the event log. topics: Vec, + /// Indicates whether the log was removed due to a chain reorganization. removed: bool, + /// The index of the event log in the sequence of events produced by the message execution. + /// (this is the index in the events AMT on the message receipt) log_index: EthUint64, + /// The index in the tipset of the transaction that produced the event log. + /// The index corresponds to the sequence of messages produced by `ChainGetParentMessages` transaction_index: EthUint64, + /// The hash of the RLP message that produced the event log. transaction_hash: EthHash, + /// The hash of the tipset containing the message that produced the log. block_hash: EthHash, + /// The epoch of the tipset containing the message. block_number: EthUint64, } lotus_json_with_self!(EthLog); @@ -2252,9 +2261,214 @@ impl RpcMethod<1> for EthGetTransactionReceipt { } } +#[derive(Debug)] +pub struct CollectedEvent { + entries: Vec, + emitter_addr: crate::shim::address::Address, + pub event_idx: u64, + reverted: bool, + height: ChainEpoch, + tipset_key: TipsetKey, + msg_idx: u64, + msg_cid: Cid, +} + +fn match_key(key: &str) -> Option { + match key.get(0..2) { + Some("t1") => Some(0), + Some("t2") => Some(1), + Some("t3") => Some(2), + Some("t4") => Some(3), + _ => None, + } +} + +fn eth_log_from_event(entries: &[EventEntry]) -> Option<(EthBytes, Vec)> { + let mut topics_found = [false; 4]; + let mut topics_found_count = 0; + let mut data_found = false; + let mut data: EthBytes = EthBytes::default(); + let mut topics: Vec = Vec::default(); + for entry in entries { + // Drop events with non-raw topics. Built-in actors emit CBOR, and anything else would be + // invalid anyway. + if entry.codec != IPLD_RAW { + return None; + } + // Check if the key is t1..t4 + if let Some(idx) = match_key(&entry.key) { + // Drop events with mis-sized topics. + let result: Result<[u8; EVM_WORD_LENGTH], _> = entry.value.0.clone().try_into(); + let bytes = if let Ok(value) = result { + value + } else { + tracing::warn!( + "got an EVM event topic with an invalid size (key: {}, size: {})", + entry.key, + entry.value.0.len() + ); + return None; + }; + // Drop events with duplicate topics. + if *topics_found.get(idx).expect("Infallible") { + tracing::warn!("got a duplicate EVM event topic (key: {})", entry.key); + return None; + } + *topics_found.get_mut(idx).expect("Infallible") = true; + topics_found_count += 1; + // Extend the topics array + if topics.len() <= idx { + topics.resize(idx + 1, EthHash::default()); + } + *topics.get_mut(idx).expect("Infallible") = bytes.into(); + } else if entry.key == "d" { + // Drop events with duplicate data fields. + if data_found { + tracing::warn!("got duplicate EVM event data"); + return None; + } + data_found = true; + data = EthBytes(entry.value.0.clone()); + } else { + // Skip entries we don't understand (makes it easier to extend things). + // But we warn for now because we don't expect them. + tracing::warn!("unexpected event entry (key: {})", entry.key); + } + } + // Drop events with skipped topics. + if topics.len() != topics_found_count { + tracing::warn!( + "EVM event topic length mismatch (expected: {}, actual: {})", + topics.len(), + topics_found_count + ); + return None; + } + Some((data, topics)) +} + +fn eth_tx_hash_from_signed_message( + message: &SignedMessage, + eth_chain_id: EthChainIdType, +) -> anyhow::Result { + if message.is_delegated() { + let (_, tx) = eth_tx_from_signed_eth_message(message, eth_chain_id)?; + Ok(tx.eth_hash()?.into()) + } else if message.is_secp256k1() { + Ok(message.cid().into()) + } else { + Ok(message.message().cid().into()) + } +} + +fn eth_tx_hash_from_message_cid( + blockstore: &DB, + message_cid: &Cid, + eth_chain_id: EthChainIdType, +) -> anyhow::Result> { + if let Ok(smsg) = crate::chain::message_from_cid(blockstore, message_cid) { + // This is an Eth Tx, Secp message, Or BLS message in the mpool + return Ok(Some(eth_tx_hash_from_signed_message(&smsg, eth_chain_id)?)); + } + let result: Result = crate::chain::message_from_cid(blockstore, message_cid); + if result.is_ok() { + // This is a BLS message + let hash: EthHash = (*message_cid).into(); + return Ok(Some(hash)); + } + Ok(None) +} + +fn transform_events(events: &[CollectedEvent], f: F) -> anyhow::Result> +where + F: Fn(&CollectedEvent) -> anyhow::Result>, +{ + events + .iter() + .filter_map(|event| match f(event) { + Ok(Some(eth_log)) => Some(Ok(eth_log)), + Ok(None) => None, + Err(e) => Some(Err(e)), + }) + .collect() +} + +fn eth_filter_logs_from_events( + ctx: &Ctx, + events: &[CollectedEvent], +) -> anyhow::Result> { + transform_events(events, |event| { + let (data, topics) = if let Some((data, topics)) = eth_log_from_event(&event.entries) { + (data, topics) + } else { + tracing::warn!("Ignoring event"); + return Ok(None); + }; + let transaction_hash = if let Some(transaction_hash) = eth_tx_hash_from_message_cid( + ctx.store(), + &event.msg_cid, + ctx.state_manager.chain_config().eth_chain_id, + )? { + transaction_hash + } else { + tracing::warn!("Ignoring event"); + return Ok(None); + }; + let address = EthAddress::from_filecoin_address(&event.emitter_addr)?; + Ok(Some(EthLog { + address, + data, + topics, + removed: event.reverted, + log_index: event.event_idx.into(), + transaction_index: event.msg_idx.into(), + transaction_hash, + block_hash: event.tipset_key.cid()?.into(), + block_number: (event.height as u64).into(), + })) + }) +} + +fn eth_filter_result_from_events( + ctx: &Ctx, + events: &[CollectedEvent], +) -> anyhow::Result { + Ok(EthFilterResult::Logs(eth_filter_logs_from_events( + ctx, events, + )?)) +} + +pub enum EthGetLogs {} +impl RpcMethod<1> for EthGetLogs { + const NAME: &'static str = "Filecoin.EthGetLogs"; + const NAME_ALIAS: Option<&'static str> = Some("eth_getLogs"); + const N_REQUIRED_PARAMS: usize = 1; + const PARAM_NAMES: [&'static str; 1] = ["eth_filter"]; + const API_PATHS: ApiPaths = ApiPaths::V1; + const PERMISSION: Permission = Permission::Read; + type Params = (EthFilterSpec,); + type Ok = EthFilterResult; + async fn handle( + ctx: Ctx, + (eth_filter,): Self::Params, + ) -> Result { + let events = ctx + .eth_event_handler + .eth_get_events_for_filter(&ctx, eth_filter) + .await?; + Ok(eth_filter_result_from_events(&ctx, &events)?) + } +} + #[cfg(test)] mod test { use super::*; + use crate::rpc::eth::EventEntry; + use crate::{ + db::MemoryDB, + test_utils::{construct_bls_messages, construct_eth_messages, construct_messages}, + }; + use fvm_shared4::event::Flags; use quickcheck::Arbitrary; use quickcheck_macros::quickcheck; @@ -2381,4 +2595,305 @@ mod test { let block = Block::new(true, 1); assert_eq!(block.transactions_root, EthHash::default()); } + + #[test] + fn test_eth_tx_hash_from_signed_message() { + let (_, signed) = construct_eth_messages(0); + let tx_hash = + eth_tx_hash_from_signed_message(&signed, crate::networks::calibnet::ETH_CHAIN_ID) + .unwrap(); + assert_eq!( + &format!("{}", tx_hash), + "0xfc81dd8d9ffb045e7e2d494f925824098183263c7f402d69e18cc25e3422791b" + ); + + let (_, signed) = construct_messages(); + let tx_hash = + eth_tx_hash_from_signed_message(&signed, crate::networks::calibnet::ETH_CHAIN_ID) + .unwrap(); + assert_eq!(tx_hash.to_cid(), signed.cid()); + + let (_, signed) = construct_bls_messages(); + let tx_hash = + eth_tx_hash_from_signed_message(&signed, crate::networks::calibnet::ETH_CHAIN_ID) + .unwrap(); + assert_eq!(tx_hash.to_cid(), signed.message().cid()); + } + + #[test] + fn test_eth_tx_hash_from_message_cid() { + let blockstore = Arc::new(MemoryDB::default()); + + let (msg0, secp0) = construct_eth_messages(0); + let (_msg1, secp1) = construct_eth_messages(1); + let (msg2, bls0) = construct_bls_messages(); + + crate::chain::persist_objects(&blockstore, [msg0.clone(), msg2.clone()].iter()).unwrap(); + crate::chain::persist_objects(&blockstore, [secp0.clone(), bls0.clone()].iter()).unwrap(); + + let tx_hash = eth_tx_hash_from_message_cid(&blockstore, &secp0.cid(), 0).unwrap(); + assert!(tx_hash.is_some()); + + let tx_hash = eth_tx_hash_from_message_cid(&blockstore, &msg2.cid(), 0).unwrap(); + assert!(tx_hash.is_some()); + + let tx_hash = eth_tx_hash_from_message_cid(&blockstore, &secp1.cid(), 0).unwrap(); + assert!(tx_hash.is_none()); + } + + #[test] + fn test_eth_log_from_event() { + // The value member of these event entries correspond to existing topics on Calibnet, + // but they could just as easily be vectors filled with random bytes. + + let entries = vec![ + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t1".into(), + codec: IPLD_RAW, + value: vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, + 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, + ] + .into(), + }, + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t2".into(), + codec: IPLD_RAW, + value: vec![ + 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, + 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, + ] + .into(), + }, + ]; + let (bytes, hashes) = eth_log_from_event(&entries).unwrap(); + assert!(bytes.0.is_empty()); + assert_eq!(hashes.len(), 2); + + let entries = vec![ + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t1".into(), + codec: IPLD_RAW, + value: vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, + 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, + ] + .into(), + }, + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t2".into(), + codec: IPLD_RAW, + value: vec![ + 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, + 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, + ] + .into(), + }, + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t3".into(), + codec: IPLD_RAW, + value: vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, + 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, + ] + .into(), + }, + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t4".into(), + codec: IPLD_RAW, + value: vec![ + 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, + 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, + ] + .into(), + }, + ]; + let (bytes, hashes) = eth_log_from_event(&entries).unwrap(); + assert!(bytes.0.is_empty()); + assert_eq!(hashes.len(), 4); + + let entries = vec![ + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t1".into(), + codec: IPLD_RAW, + value: vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, + 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, + ] + .into(), + }, + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t1".into(), + codec: IPLD_RAW, + value: vec![ + 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, + 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, + ] + .into(), + }, + ]; + assert!(eth_log_from_event(&entries).is_none()); + + let entries = vec![ + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t3".into(), + codec: IPLD_RAW, + value: vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, + 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, + ] + .into(), + }, + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t4".into(), + codec: IPLD_RAW, + value: vec![ + 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, + 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, + ] + .into(), + }, + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t1".into(), + codec: IPLD_RAW, + value: vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, + 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, + ] + .into(), + }, + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t2".into(), + codec: IPLD_RAW, + value: vec![ + 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, + 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, + ] + .into(), + }, + ]; + let (bytes, hashes) = eth_log_from_event(&entries).unwrap(); + assert!(bytes.0.is_empty()); + assert_eq!(hashes.len(), 4); + + let entries = vec![ + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t1".into(), + codec: IPLD_RAW, + value: vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, + 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, + ] + .into(), + }, + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t3".into(), + codec: IPLD_RAW, + value: vec![ + 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, + 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, + ] + .into(), + }, + ]; + assert!(eth_log_from_event(&entries).is_none()); + + let entries = vec![EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t1".into(), + codec: DAG_CBOR, + value: vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, 81, + 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, + ] + .into(), + }]; + assert!(eth_log_from_event(&entries).is_none()); + + let entries = vec![EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t1".into(), + codec: IPLD_RAW, + value: vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, 81, + 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, 0, + ] + .into(), + }]; + assert!(eth_log_from_event(&entries).is_none()); + + let entries = vec![ + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t1".into(), + codec: IPLD_RAW, + value: vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, + 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, + ] + .into(), + }, + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "d".into(), + codec: IPLD_RAW, + value: vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 49, 190, + 25, 34, 116, 232, 27, 26, 248, + ] + .into(), + }, + ]; + let (bytes, hashes) = eth_log_from_event(&entries).unwrap(); + assert_eq!(bytes.0.len(), 32); + assert_eq!(hashes.len(), 1); + + let entries = vec![ + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "t1".into(), + codec: IPLD_RAW, + value: vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, + 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, 0, + ] + .into(), + }, + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "d".into(), + codec: IPLD_RAW, + value: vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 49, 190, + 25, 34, 116, 232, 27, 26, 248, + ] + .into(), + }, + EventEntry { + flags: (Flags::FLAG_INDEXED_ALL).bits(), + key: "d".into(), + codec: IPLD_RAW, + value: vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 49, 190, + 25, 34, 116, 232, 27, 26, 248, + ] + .into(), + }, + ]; + assert!(eth_log_from_event(&entries).is_none()); + } } diff --git a/src/rpc/methods/eth/filter/event.rs b/src/rpc/methods/eth/filter/event.rs index 32c6b34b3eb5..9648c26abf77 100644 --- a/src/rpc/methods/eth/filter/event.rs +++ b/src/rpc/methods/eth/filter/event.rs @@ -1,15 +1,12 @@ // Copyright 2019-2024 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use crate::rpc::eth::filter::ActorEventBlock; -use crate::rpc::eth::filter::ParsedFilter; +use crate::rpc::eth::filter::{ActorEventBlock, ParsedFilter, ParsedFilterTipsets}; use crate::rpc::eth::{filter::Filter, FilterID}; use crate::rpc::Arc; use crate::shim::address::Address; -use crate::shim::clock::ChainEpoch; use ahash::AHashMap as HashMap; use anyhow::{Context, Result}; -use cid::Cid; use parking_lot::RwLock; use std::any::Any; @@ -17,9 +14,7 @@ use std::any::Any; #[derive(Debug, PartialEq)] pub struct EventFilter { id: FilterID, - min_height: ChainEpoch, // minimum epoch to apply filter - max_height: ChainEpoch, // maximum epoch to apply filter - tipset_cid: Cid, + tipsets: ParsedFilterTipsets, addresses: Vec
, // list of actor addresses that are extpected to emit the event keys_with_codec: HashMap>, // map of key names to a list of alternate values that may match max_results: usize, // maximum number of results to collect @@ -41,8 +36,6 @@ impl Filter for EventFilter { pub struct EventFilterManager { filters: RwLock>>, max_filter_results: usize, - // TODO(elmattic): https://github.com/ChainSafe/forest/issues/4740 - //pub event_index: Option>, } impl EventFilterManager { @@ -58,9 +51,7 @@ impl EventFilterManager { let filter = Arc::new(EventFilter { id: id.clone(), - min_height: pf.min_height, - max_height: pf.max_height, - tipset_cid: pf.tipset_cid, + tipsets: pf.tipsets, addresses: pf.addresses, keys_with_codec: pf.keys, max_results: self.max_filter_results, @@ -80,10 +71,9 @@ impl EventFilterManager { #[cfg(test)] mod tests { use super::*; - use crate::rpc::eth::filter::ParsedFilter; + use crate::rpc::eth::filter::{ParsedFilter, ParsedFilterTipsets}; use crate::shim::address::Address; - use crate::shim::clock::ChainEpoch; - use cid::Cid; + use std::ops::RangeInclusive; #[test] fn test_event_filter() { @@ -91,9 +81,7 @@ mod tests { let event_manager = EventFilterManager::new(max_filter_results); let parsed_filter = ParsedFilter { - min_height: ChainEpoch::from(0), - max_height: ChainEpoch::from(100), - tipset_cid: Cid::default(), + tipsets: ParsedFilterTipsets::Range(RangeInclusive::new(0, 100)), addresses: vec![Address::new_id(123)], keys: HashMap::new(), }; diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index b27c17c88f42..7dd702f618e7 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -19,20 +19,30 @@ mod mempool; mod store; mod tipset; +use super::get_tipset_from_hash; use super::BlockNumberOrHash; +use super::CollectedEvent; use super::Predefined; +use crate::blocks::Tipset; +use crate::chain::index::ResolveNullTipset; use crate::rpc::eth::filter::event::*; use crate::rpc::eth::filter::mempool::*; use crate::rpc::eth::filter::tipset::*; use crate::rpc::eth::types::*; +use crate::rpc::eth::EVM_WORD_LENGTH; +use crate::rpc::reflect::Ctx; +use crate::rpc::types::EventEntry; use crate::shim::address::Address; use crate::shim::clock::ChainEpoch; +use crate::shim::executor::Entry; +use crate::state_manager::StateEvents; use crate::utils::misc::env::env_or_default; use ahash::AHashMap as HashMap; use anyhow::{anyhow, bail, ensure, Context, Error}; -use cid::Cid; +use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::IPLD_RAW; use serde::*; +use std::ops::RangeInclusive; use std::sync::Arc; use store::*; @@ -186,6 +196,180 @@ impl EthEventHandler { Ok(false) } } + + fn parse_eth_filter_spec( + &self, + ctx: &Ctx, + filter_spec: &EthFilterSpec, + ) -> anyhow::Result { + EthFilterSpec::parse_eth_filter_spec( + filter_spec, + ctx.chain_store().heaviest_tipset().epoch(), + self.max_filter_height_range, + ) + } + + fn do_match(spec: &EthFilterSpec, eth_emitter_addr: &EthAddress, entries: &[Entry]) -> bool { + fn get_word(value: &[u8]) -> Option<&[u8; EVM_WORD_LENGTH]> { + value.get(..EVM_WORD_LENGTH)?.try_into().ok() + } + + let match_addr = if spec.address.is_empty() { + true + } else { + spec.address.iter().any(|other| other == eth_emitter_addr) + }; + let match_topics = if let Some(spec) = spec.topics.as_ref() { + let matched = entries.iter().enumerate().all(|(i, entry)| { + if let Some(slice) = get_word(entry.value()) { + let hash: EthHash = (*slice).into(); + match spec.0.get(i) { + Some(EthHashList::List(vec)) => vec.contains(&hash), + Some(EthHashList::Single(Some(h))) => h == &hash, + _ => true, /* wildcard */ + } + } else { + // Drop events with mis-sized topics + false + } + }); + matched + } else { + true + }; + match_addr && match_topics + } + + async fn collect_events( + ctx: &Ctx, + tipset: &Arc, + spec: &EthFilterSpec, + collected_events: &mut Vec, + ) -> anyhow::Result<()> { + let tipset_key = tipset.key().clone(); + let height = tipset.epoch(); + + let messages = ctx.chain_store().messages_for_tipset(tipset)?; + + let StateEvents { events, .. } = ctx.state_manager.tipset_state_events(tipset).await?; + + ensure!( + messages.len() == events.len(), + "Length of messages and events do not match" + ); + for (i, (message, events)) in messages.iter().zip(events.into_iter()).enumerate() { + for (j, event) in events.iter().enumerate() { + let id_addr = Address::new_id(event.emitter()); + let result = ctx + .state_manager + .resolve_to_deterministic_address(id_addr, tipset.clone()) + .await + .with_context(|| { + format!( + "resolving address {} failed (EPOCH = {})", + id_addr, + tipset.epoch() + ) + }); + let resolved = if let Ok(resolved) = result { + resolved + } else { + // Skip event + continue; + }; + + let event_idx = j as u64; + + let eth_emitter_addr = EthAddress::from_filecoin_address(&resolved)?; + + let entries: Vec = event.event().entries(); + // dbg!(&entries); + + let matched = Self::do_match(spec, ð_emitter_addr, &entries); + tracing::debug!( + "Event {} {}match filter topics", + event_idx, + if matched { "" } else { "do not " } + ); + if matched { + let entries: Vec = entries + .into_iter() + .map(|entry| { + let (flags, key, codec, value) = entry.into_parts(); + EventEntry { + flags, + key, + codec, + value: value.into(), + } + }) + .collect(); + + let ce = CollectedEvent { + entries, + emitter_addr: resolved, + event_idx, + reverted: false, + height, + tipset_key: tipset_key.clone(), + msg_idx: i as u64, + msg_cid: message.cid(), + }; + collected_events.push(ce); + } + } + } + Ok(()) + } + + pub async fn eth_get_events_for_filter( + &self, + ctx: &Ctx, + spec: EthFilterSpec, + ) -> anyhow::Result> { + let pf = self.parse_eth_filter_spec(ctx, &spec)?; + + let mut collected_events = vec![]; + match pf.tipsets { + ParsedFilterTipsets::Hash(block_hash) => { + let tipset = get_tipset_from_hash(ctx.chain_store(), &block_hash)?; + let tipset = Arc::new(tipset); + Self::collect_events(ctx, &tipset, &spec, &mut collected_events).await?; + } + ParsedFilterTipsets::Range(range) => { + let max_height = if *range.end() == -1 { + // heaviest tipset doesn't have events because its messages haven't been executed yet + ctx.chain_store().heaviest_tipset().epoch() - 1 + } else if *range.end() < 0 { + bail!("max_height requested is less than 0") + } else if *range.end() > ctx.chain_store().heaviest_tipset().epoch() - 1 { + // we can't return events for the heaviest tipset as the transactions in that tipset will be executed + // in the next non-null tipset (because of Filecoin's "deferred execution" model) + bail!("max_height requested is greater than the heaviest tipset"); + } else { + *range.end() + }; + + let num_tipsets = (range.end() - range.start()) as usize + 1; + let max_tipset = ctx.chain_store().chain_index.tipset_by_height( + max_height, + ctx.chain_store().heaviest_tipset(), + ResolveNullTipset::TakeOlder, + )?; + for tipset in max_tipset + .as_ref() + .clone() + .chain(&ctx.store()) + .take(num_tipsets) + { + let tipset = Arc::new(tipset); + Self::collect_events(ctx, &tipset, &spec, &mut collected_events).await?; + } + } + } + + Ok(collected_events) + } } impl EthFilterSpec { @@ -194,21 +378,21 @@ impl EthFilterSpec { chain_height: i64, max_filter_height_range: i64, ) -> Result { - let from_block = self.from_block.as_deref().unwrap_or(""); - let to_block = self.to_block.as_deref().unwrap_or(""); - let (min_height, max_height, tipset_cid) = if let Some(block_hash) = &self.block_hash { + let tipsets = if let Some(block_hash) = &self.block_hash { if self.from_block.is_some() || self.to_block.is_some() { bail!("must not specify block hash and from/to block"); } - (0, 0, block_hash.to_cid()) + ParsedFilterTipsets::Hash(block_hash.clone()) } else { + let from_block = self.from_block.as_deref().unwrap_or(""); + let to_block = self.to_block.as_deref().unwrap_or(""); let (min, max) = parse_block_range( chain_height, BlockNumberOrHash::from_str(from_block)?, BlockNumberOrHash::from_str(to_block)?, max_filter_height_range, )?; - (min, max, Cid::default()) + ParsedFilterTipsets::Range(RangeInclusive::new(min, max)) }; let addresses: Vec<_> = self @@ -220,14 +404,16 @@ impl EthFilterSpec { }) .collect::, _>>()?; - let keys = parse_eth_topics(&self.topics)?; + let keys = if let Some(topics) = &self.topics { + keys_to_keys_with_codec(parse_eth_topics(topics)?) + } else { + HashMap::new() + }; Ok(ParsedFilter { - min_height, - max_height, - tipset_cid, + tipsets, addresses, - keys: keys_to_keys_with_codec(keys), + keys, }) } } @@ -301,11 +487,20 @@ fn parse_eth_topics( let mut keys: HashMap>> = HashMap::with_capacity(4); // Each eth log entry can contain up to 4 topics for (idx, eth_hash_list) in topics.iter().enumerate() { - let EthHashList(hashes) = eth_hash_list; let key = format!("t{}", idx + 1); - for eth_hash in hashes { - let EthHash(bytes) = eth_hash; - keys.entry(key.clone()).or_default().push(bytes.0.to_vec()); + match eth_hash_list { + EthHashList::List(hashes) => { + let key = format!("t{}", idx + 1); + for eth_hash in hashes { + let EthHash(bytes) = eth_hash; + keys.entry(key.clone()).or_default().push(bytes.0.to_vec()); + } + } + EthHashList::Single(Some(hash)) => { + let EthHash(bytes) = hash; + keys.entry(key.clone()).or_default().push(bytes.0.to_vec()); + } + EthHashList::Single(None) => {} } } Ok(keys) @@ -337,16 +532,22 @@ fn keys_to_keys_with_codec( keys_with_codec } +#[derive(Debug, PartialEq)] +enum ParsedFilterTipsets { + Range(std::ops::RangeInclusive), + Hash(EthHash), +} + struct ParsedFilter { - min_height: ChainEpoch, - max_height: ChainEpoch, - tipset_cid: Cid, + tipsets: ParsedFilterTipsets, addresses: Vec
, keys: HashMap>, } #[cfg(test)] mod tests { + use fvm_shared4::event::Flags; + use super::*; use crate::rpc::eth::{EthAddress, EthFilterSpec, EthTopicSpec}; use std::str::FromStr; @@ -359,7 +560,7 @@ mod tests { address: vec![ EthAddress::from_str("0xff38c072f286e3b20b3954ca9f99c05fbecc64aa").unwrap(), ], - topics: EthTopicSpec(vec![]), + topics: None, block_hash: None, }; @@ -379,7 +580,7 @@ mod tests { address: vec![ EthAddress::from_str("0xff38c072f286e3b20b3954ca9f99c05fbecc64aa").unwrap(), ], - topics: EthTopicSpec(vec![]), + topics: None, block_hash: None, }; @@ -456,7 +657,7 @@ mod tests { #[test] fn test_parse_eth_topics() { - let topics = EthTopicSpec(vec![EthHashList(vec![EthHash::default()])]); + let topics = EthTopicSpec(vec![EthHashList::List(vec![EthHash::default()])]); let actual = parse_eth_topics(&topics).expect("Failed to parse topics"); let mut expected = HashMap::with_capacity(4); @@ -496,7 +697,7 @@ mod tests { address: vec![ EthAddress::from_str("0xff38c072f286e3b20b3954ca9f99c05fbecc64aa").unwrap(), ], - topics: EthTopicSpec(vec![]), + topics: None, block_hash: None, }; @@ -535,7 +736,7 @@ mod tests { address: vec![ EthAddress::from_str("0xff38c072f286e3b20b3954ca9f99c05fbecc64aa").unwrap(), ], - topics: EthTopicSpec(vec![]), + topics: None, block_hash: None, }; @@ -557,4 +758,247 @@ mod tests { ); } } + + #[test] + fn test_do_match_address() { + let empty_spec = EthFilterSpec { + from_block: None, + to_block: None, + address: vec![], + topics: None, + block_hash: None, + }; + + let eth_addr0 = EthAddress::from_str("0xff38c072f286e3b20b3954ca9f99c05fbecc64aa").unwrap(); + + let eth_addr1 = EthAddress::from_str("0x26937d59db4463254c930d5f31353f14aa89a0f7").unwrap(); + + let entries0 = vec![ + Entry::new( + Flags::FLAG_INDEXED_ALL, + "t1".into(), + IPLD_RAW, + vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, + 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, + ], + ), + Entry::new( + Flags::FLAG_INDEXED_ALL, + "t2".into(), + IPLD_RAW, + vec![ + 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, + 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, + ], + ), + Entry::new( + Flags::FLAG_INDEXED_ALL, + "d".into(), + IPLD_RAW, + vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, + 254, 169, 229, 74, 6, 24, 52, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 13, 232, 134, 151, 206, 121, 139, 231, 226, 192, + ], + ), + ]; + + // Matching an empty spec + assert!(EthEventHandler::do_match(&empty_spec, ð_addr0, &[])); + + assert!(EthEventHandler::do_match( + &empty_spec, + ð_addr0, + &entries0 + )); + + // Matching the given address 0 + let spec0 = EthFilterSpec { + from_block: None, + to_block: None, + address: vec![eth_addr0.clone()], + topics: None, + block_hash: None, + }; + + assert!(EthEventHandler::do_match(&spec0, ð_addr0, &[])); + + assert!(!EthEventHandler::do_match(&spec0, ð_addr1, &[])); + + // Matching the given address 0 or 1 + let spec1 = EthFilterSpec { + from_block: None, + to_block: None, + address: vec![eth_addr0.clone(), eth_addr1.clone()], + topics: None, + block_hash: None, + }; + + assert!(EthEventHandler::do_match(&spec1, ð_addr0, &[])); + + assert!(EthEventHandler::do_match(&spec1, ð_addr1, &[])); + } + + #[test] + fn test_do_match_topic() { + let eth_addr0 = EthAddress::from_str("0xff38c072f286e3b20b3954ca9f99c05fbecc64aa").unwrap(); + + let entries0 = vec![ + Entry::new( + Flags::FLAG_INDEXED_ALL, + "t1".into(), + IPLD_RAW, + vec![ + 226, 71, 32, 244, 92, 183, 79, 45, 85, 241, 222, 235, 182, 9, 143, 80, 241, 11, + 81, 29, 171, 138, 125, 71, 196, 129, 154, 8, 220, 208, 184, 149, + ], + ), + Entry::new( + Flags::FLAG_INDEXED_ALL, + "t2".into(), + IPLD_RAW, + vec![ + 116, 4, 227, 209, 4, 234, 120, 65, 195, 217, 230, 253, 32, 173, 254, 153, 180, + 173, 88, 107, 192, 141, 143, 59, 211, 175, 239, 137, 76, 241, 132, 222, + ], + ), + Entry::new( + Flags::FLAG_INDEXED_ALL, + "d".into(), + IPLD_RAW, + vec![ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, + 254, 169, 229, 74, 6, 24, 52, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 13, 232, 134, 151, 206, 121, 139, 231, 226, 192, + ], + ), + ]; + + let topic0 = + EthHash::from_str("0xe24720f45cb74f2d55f1deebb6098f50f10b511dab8a7d47c4819a08dcd0b895") + .unwrap(); + + let topic1 = + EthHash::from_str("0x7404e3d104ea7841c3d9e6fd20adfe99b4ad586bc08d8f3bd3afef894cf184de") + .unwrap(); + + let topic2 = + EthHash::from_str("0x000000000000000000000000d0fb381fc644cdd5d694d35e1afb445527b9244b") + .unwrap(); + + let topic3 = + EthHash::from_str("0x00000000000000000000000092c3b379c217fdf8603884770e83fded7b7410f8") + .unwrap(); + + let spec1 = EthFilterSpec { + from_block: None, + to_block: None, + address: vec![], + topics: Some(EthTopicSpec(vec![EthHashList::Single(None)])), + block_hash: None, + }; + + assert!(EthEventHandler::do_match(&spec1, ð_addr0, &entries0)); + + let spec2 = EthFilterSpec { + from_block: None, + to_block: None, + address: vec![], + topics: Some(EthTopicSpec(vec![ + EthHashList::Single(None), + EthHashList::Single(None), + ])), + block_hash: None, + }; + + assert!(EthEventHandler::do_match(&spec2, ð_addr0, &entries0)); + + let spec2 = EthFilterSpec { + from_block: None, + to_block: None, + address: vec![], + topics: Some(EthTopicSpec(vec![EthHashList::Single(Some( + topic0.clone(), + ))])), + block_hash: None, + }; + + assert!(EthEventHandler::do_match(&spec2, ð_addr0, &entries0)); + + let spec3 = EthFilterSpec { + from_block: None, + to_block: None, + address: vec![], + topics: Some(EthTopicSpec(vec![EthHashList::List(vec![topic0.clone()])])), + block_hash: None, + }; + + assert!(EthEventHandler::do_match(&spec3, ð_addr0, &entries0)); + + let spec4 = EthFilterSpec { + from_block: None, + to_block: None, + address: vec![], + topics: Some(EthTopicSpec(vec![EthHashList::List(vec![ + topic1.clone(), + topic0.clone(), + ])])), + block_hash: None, + }; + + assert!(EthEventHandler::do_match(&spec4, ð_addr0, &entries0)); + + let spec5 = EthFilterSpec { + from_block: None, + to_block: None, + address: vec![], + topics: Some(EthTopicSpec(vec![EthHashList::Single(Some( + topic1.clone(), + ))])), + block_hash: None, + }; + + assert!(!EthEventHandler::do_match(&spec5, ð_addr0, &entries0)); + + let spec6 = EthFilterSpec { + from_block: None, + to_block: None, + address: vec![], + topics: Some(EthTopicSpec(vec![EthHashList::List(vec![ + topic2.clone(), + topic3.clone(), + ])])), + block_hash: None, + }; + + assert!(!EthEventHandler::do_match(&spec6, ð_addr0, &entries0)); + + let spec7 = EthFilterSpec { + from_block: None, + to_block: None, + address: vec![], + topics: Some(EthTopicSpec(vec![ + EthHashList::Single(Some(topic1.clone())), + EthHashList::Single(Some(topic1.clone())), + ])), + block_hash: None, + }; + + assert!(!EthEventHandler::do_match(&spec7, ð_addr0, &entries0)); + + let spec8 = EthFilterSpec { + from_block: None, + to_block: None, + address: vec![], + topics: Some(EthTopicSpec(vec![ + EthHashList::Single(Some(topic0.clone())), + EthHashList::Single(Some(topic1.clone())), + EthHashList::Single(Some(topic3.clone())), + ])), + block_hash: None, + }; + + assert!(!EthEventHandler::do_match(&spec8, ð_addr0, &entries0)); + } } diff --git a/src/rpc/methods/eth/types.rs b/src/rpc/methods/eth/types.rs index fce149517959..06fc904dd2ea 100644 --- a/src/rpc/methods/eth/types.rs +++ b/src/rpc/methods/eth/types.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::*; -use ethereum_types::H256; use libipld::error::SerdeError; use serde::de::{value::StringDeserializer, IntoDeserializer}; use uuid::Uuid; @@ -332,7 +331,9 @@ impl TryFrom for Message { derive_more::Into, )] #[displaydoc("{0:#x}")] -pub struct EthHash(#[schemars(with = "String")] pub H256); +pub struct EthHash(#[schemars(with = "String")] pub ethereum_types::H256); + +lotus_json_with_self!(EthHash); #[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Hash, Clone)] pub struct FilterID(EthHash); @@ -344,12 +345,21 @@ impl FilterID { let raw_id = Uuid::new_v4(); let mut id = [0u8; 32]; id[..16].copy_from_slice(raw_id.as_bytes()); - Ok(FilterID(EthHash(H256::from_slice(&id)))) + Ok(FilterID(EthHash(ethereum_types::H256::from_slice(&id)))) } } -#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] -pub struct EthHashList(pub Vec); +/// `EthHashList` represents a topic filter that can take one of two forms: +/// - `List`: Matches if the hash is present in the vector. +/// - `Single`: An optional hash, where: +/// - `Some(hash)`: Matches exactly this hash. +/// - `None`: Acts as a wildcard. +#[derive(PartialEq, Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[serde(untagged)] +pub enum EthHashList { + List(Vec), + Single(Option), +} #[derive(Default, Serialize, Deserialize, Debug, Clone, JsonSchema)] pub struct EthTopicSpec(pub Vec); @@ -394,13 +404,25 @@ pub struct EthFilterSpec { #[serde(skip_serializing_if = "Option::is_none", default)] pub to_block: Option, pub address: Vec, - pub topics: EthTopicSpec, + pub topics: Option, #[serde(skip_serializing_if = "Option::is_none", default)] pub block_hash: Option, } - lotus_json_with_self!(EthFilterSpec); +/// `EthFilterResult` represents the response from executing a filter: +/// - A list of block hashes +/// - A list of transaction hashes +/// - Or a list of logs +#[derive(PartialEq, Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(untagged)] +pub enum EthFilterResult { + Blocks(Vec), + Txs(Vec), + Logs(Vec), +} +lotus_json_with_self!(EthFilterResult); + #[cfg(test)] mod tests { use super::*; diff --git a/src/rpc/methods/state.rs b/src/rpc/methods/state.rs index a09b8f92a5c8..d16bb72589aa 100644 --- a/src/rpc/methods/state.rs +++ b/src/rpc/methods/state.rs @@ -13,6 +13,7 @@ use crate::blocks::Tipset; use crate::chain::index::ResolveNullTipset; use crate::cid_collections::CidHashSet; use crate::eth::EthChainId; +use crate::interpreter::VMEvent; use crate::libp2p::NetworkMessage; use crate::lotus_json::lotus_json_with_self; use crate::networks::{ChainConfig, NetworkChain}; @@ -34,7 +35,7 @@ use crate::shim::{ state_tree::ActorState, version::NetworkVersion, }; use crate::state_manager::circulating_supply::GenesisInfo; -use crate::state_manager::MarketBalance; +use crate::state_manager::{MarketBalance, StateOutput}; use crate::utils::db::{ car_stream::{CarBlock, CarWriter}, BlockstoreExt as _, @@ -1345,12 +1346,13 @@ impl RpcMethod<1> for StateCompute { ctx.chain_store().heaviest_tipset(), ResolveNullTipset::TakeOlder, )?; - let (state_root, _) = ctx + let StateOutput { state_root, .. } = ctx .state_manager .compute_tipset_state( tipset, crate::state_manager::NO_CALLBACK, crate::interpreter::VMTrace::NotTraced, + VMEvent::NotPushed, ) .await?; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index ba87ad764171..2e907f5955cb 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -83,6 +83,7 @@ macro_rules! for_each_method { $callback!(crate::rpc::eth::EthGetBlockTransactionCountByHash); $callback!(crate::rpc::eth::EthGetBlockTransactionCountByNumber); $callback!(crate::rpc::eth::EthGetCode); + $callback!(crate::rpc::eth::EthGetLogs); $callback!(crate::rpc::eth::EthGetMessageCidByTransactionHash); $callback!(crate::rpc::eth::EthGetStorageAt); $callback!(crate::rpc::eth::EthGetTransactionByHash); diff --git a/src/rpc/types/mod.rs b/src/rpc/types/mod.rs index 7dc77ef2dc0b..cee44b243335 100644 --- a/src/rpc/types/mod.rs +++ b/src/rpc/types/mod.rs @@ -521,10 +521,10 @@ pub struct MiningBaseInfo { lotus_json_with_self!(MiningBaseInfo); -#[derive(Clone, JsonSchema, Serialize, Deserialize)] +#[derive(Debug, Clone, JsonSchema, Serialize, Deserialize)] #[serde(rename_all = "PascalCase")] pub struct EventEntry { - pub flags: u8, + pub flags: u64, pub key: String, pub codec: u64, pub value: LotusJson>, diff --git a/src/shim/executor.rs b/src/shim/executor.rs index 3fc9c89be1c0..29b707ea9938 100644 --- a/src/shim/executor.rs +++ b/src/shim/executor.rs @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::trace::ExecutionEvent; -use crate::shim::{econ::TokenAmount, fvm_shared_latest::error::ExitCode}; +use crate::shim::{ + econ::TokenAmount, fvm_shared_latest::error::ExitCode, fvm_shared_latest::ActorID, +}; use cid::Cid; use fil_actors_shared::fvm_ipld_amt::Amtv0; use fvm2::executor::ApplyRet as ApplyRet_v2; @@ -11,7 +13,13 @@ use fvm4::executor::ApplyRet as ApplyRet_v4; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::RawBytes; use fvm_shared2::receipt::Receipt as Receipt_v2; +use fvm_shared3::event::ActorEvent as ActorEvent_v3; +use fvm_shared3::event::Entry as Entry_v3; +use fvm_shared3::event::StampedEvent as StampedEvent_v3; pub use fvm_shared3::receipt::Receipt as Receipt_v3; +use fvm_shared4::event::ActorEvent as ActorEvent_v4; +use fvm_shared4::event::Entry as Entry_v4; +use fvm_shared4::event::StampedEvent as StampedEvent_v4; use fvm_shared4::receipt::Receipt as Receipt_v4; use serde::Serialize; @@ -105,6 +113,14 @@ impl ApplyRet { ApplyRet::V4(v4) => v4.exec_trace.iter().cloned().map(Into::into).collect(), } } + + pub fn events(&self) -> Vec { + match self { + ApplyRet::V2(_) => Vec::::default(), + ApplyRet::V3(v3) => v3.events.iter().cloned().map(Into::into).collect(), + ApplyRet::V4(v4) => v4.events.iter().cloned().map(Into::into).collect(), + } + } } // Note: it's impossible to properly derive Deserialize. @@ -195,6 +211,135 @@ impl From for Receipt { } } +#[derive(Clone, Debug)] +pub enum Entry { + V3(Entry_v3), + V4(Entry_v4), +} + +impl From for Entry { + fn from(other: Entry_v3) -> Self { + Self::V3(other) + } +} + +impl From for Entry { + fn from(other: Entry_v4) -> Self { + Self::V4(other) + } +} + +impl Entry { + #[cfg(test)] + pub fn new( + flags: crate::shim::fvm_shared_latest::event::Flags, + key: String, + codec: u64, + value: Vec, + ) -> Self { + Entry::V4(Entry_v4 { + flags, + key, + codec, + value, + }) + } + + pub fn into_parts(self) -> (u64, String, u64, Vec) { + match self { + Self::V3(v3) => { + let Entry_v3 { + flags, + key, + codec, + value, + } = v3; + (flags.bits(), key, codec, value) + } + Self::V4(v4) => { + let Entry_v4 { + flags, + key, + codec, + value, + } = v4; + (flags.bits(), key, codec, value) + } + } + } + + pub fn value(&self) -> &Vec { + match self { + Self::V3(v3) => &v3.value, + Self::V4(v4) => &v4.value, + } + } +} + +#[derive(Clone, Debug)] +pub enum ActorEvent { + V3(ActorEvent_v3), + V4(ActorEvent_v4), +} + +impl From for ActorEvent { + fn from(other: ActorEvent_v3) -> Self { + ActorEvent::V3(other) + } +} + +impl From for ActorEvent { + fn from(other: ActorEvent_v4) -> Self { + ActorEvent::V4(other) + } +} + +impl ActorEvent { + pub fn entries(&self) -> Vec { + match self { + Self::V3(v3) => v3.entries.clone().into_iter().map(Into::into).collect(), + Self::V4(v4) => v4.entries.clone().into_iter().map(Into::into).collect(), + } + } +} + +/// Event with extra information stamped by the FVM. +#[derive(Clone, Debug)] +pub enum StampedEvent { + V3(StampedEvent_v3), + V4(StampedEvent_v4), +} + +impl From for StampedEvent { + fn from(other: StampedEvent_v3) -> Self { + StampedEvent::V3(other) + } +} + +impl From for StampedEvent { + fn from(other: StampedEvent_v4) -> Self { + StampedEvent::V4(other) + } +} + +impl StampedEvent { + /// Returns the ID of the actor that emitted this event. + pub fn emitter(&self) -> ActorID { + match self { + Self::V3(v3) => v3.emitter, + Self::V4(v4) => v4.emitter, + } + } + + /// Returns the event as emitted by the actor. + pub fn event(&self) -> ActorEvent { + match self { + Self::V3(v3) => v3.event.clone().into(), + Self::V4(v4) => v4.event.clone().into(), + } + } +} + #[cfg(test)] impl quickcheck::Arbitrary for Receipt { fn arbitrary(g: &mut quickcheck::Gen) -> Self { diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index cf50250b8118..a30a495c58c7 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -17,7 +17,7 @@ use crate::chain::{ }; use crate::chain_sync::SyncConfig; use crate::interpreter::{ - resolve_to_key_addr, ApplyResult, BlockMessages, CalledAt, ExecutionContext, + resolve_to_key_addr, ApplyResult, BlockMessages, CalledAt, ExecutionContext, VMEvent, IMPLICIT_MESSAGE_GAS_LIMIT, VM, }; use crate::interpreter::{MessageCallbackCtx, VMTrace}; @@ -32,13 +32,12 @@ use crate::shim::{ miner::MinerStateExt as _, state_load::*, verifreg::VerifiedRegistryStateExt as _, LoadActorStateFromBlockstore, }, - executor::ApplyRet, + executor::{ApplyRet, Receipt, StampedEvent}, }; use crate::shim::{ address::{Address, Payload, Protocol}, clock::ChainEpoch, econ::TokenAmount, - executor::Receipt, message::Message, randomness::Randomness, state_tree::{ActorState, StateTree}, @@ -82,17 +81,56 @@ pub use utils::is_valid_for_sending; const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize); +const DEFAULT_EVENT_CACHE_SIZE: NonZeroUsize = nonzero!(4096usize); + /// Intermediary for retrieving state objects and updating actor states. type CidPair = (Cid, Cid); +#[derive(Clone)] +pub struct StateOutput { + pub state_root: Cid, + pub receipt_root: Cid, + pub events: Vec>, +} + +#[derive(Clone)] +pub struct StateOutputValue { + pub state_root: Cid, + pub receipt_root: Cid, +} + +impl From for StateOutput { + fn from(value: StateOutputValue) -> Self { + Self { + state_root: value.state_root, + receipt_root: value.receipt_root, + events: vec![], + } + } +} + +impl From for StateOutputValue { + fn from(value: StateOutput) -> Self { + StateOutputValue { + state_root: value.state_root, + receipt_root: value.receipt_root, + } + } +} + +#[derive(Clone)] +pub struct StateEvents { + pub events: Vec>, +} + // Various structures for implementing the tipset state cache -struct TipsetStateCacheInner { - values: LruCache, +struct TipsetStateCacheInner { + values: LruCache, pending: Vec<(TipsetKey, Arc>)>, } -impl Default for TipsetStateCacheInner { +impl Default for TipsetStateCacheInner { fn default() -> Self { Self { values: LruCache::new(DEFAULT_TIPSET_CACHE_SIZE), @@ -101,37 +139,52 @@ impl Default for TipsetStateCacheInner { } } -struct TipsetStateCache { - cache: Arc>, +impl TipsetStateCacheInner { + pub fn with_size(cache_size: NonZeroUsize) -> Self { + Self { + values: LruCache::new(cache_size), + pending: Vec::with_capacity(8), + } + } } -enum Status { - Done(CidPair), +struct TipsetStateCache { + cache: Arc>>, +} + +enum Status { + Done(V), Empty(Arc>), } -impl TipsetStateCache { +impl TipsetStateCache { pub fn new() -> Self { Self { cache: Arc::new(SyncMutex::new(TipsetStateCacheInner::default())), } } + pub fn with_size(cache_size: NonZeroUsize) -> Self { + Self { + cache: Arc::new(SyncMutex::new(TipsetStateCacheInner::with_size(cache_size))), + } + } + fn with_inner(&self, func: F) -> T where - F: FnOnce(&mut TipsetStateCacheInner) -> T, + F: FnOnce(&mut TipsetStateCacheInner) -> T, { let mut lock = self.cache.lock(); func(&mut lock) } - pub async fn get_or_else(&self, key: &TipsetKey, compute: F) -> anyhow::Result + pub async fn get_or_else(&self, key: &TipsetKey, compute: F) -> anyhow::Result where F: Fn() -> Fut, - Fut: core::future::Future>, + Fut: core::future::Future>, { let status = self.with_inner(|inner| match inner.values.get(key) { - Some(v) => Status::Done(*v), + Some(v) => Status::Done(v.clone()), None => { let option = inner .pending @@ -172,22 +225,22 @@ impl TipsetStateCache { .get_or_create(&crate::metrics::values::STATE_MANAGER_TIPSET) .inc(); - let cid_pair = compute().await?; + let value = compute().await?; // Write back to cache, release lock and return value - self.insert(key.clone(), cid_pair); - Ok(cid_pair) + self.insert(key.clone(), value.clone()); + Ok(value) } } } } } - fn get(&self, key: &TipsetKey) -> Option { - self.with_inner(|inner| inner.values.get(key).copied()) + fn get(&self, key: &TipsetKey) -> Option { + self.with_inner(|inner| inner.values.get(key).cloned()) } - fn insert(&self, key: TipsetKey, value: CidPair) { + fn insert(&self, key: TipsetKey, value: V) { self.with_inner(|inner| { inner.pending.retain(|(k, _)| k != &key); inner.values.put(key, value); @@ -217,7 +270,9 @@ pub struct StateManager { cs: Arc>, /// This is a cache which indexes tipsets to their calculated state. - cache: TipsetStateCache, + cache: TipsetStateCache, + /// This is a cache dedicated to tipset events. + events_cache: TipsetStateCache, // Beacon can be cheaply crated from the `chain_config`. The only reason we // store it here is because it has a look-up cache. beacon: Arc, @@ -244,6 +299,7 @@ where Ok(Self { cs, cache: TipsetStateCache::new(), + events_cache: TipsetStateCache::with_size(DEFAULT_EVENT_CACHE_SIZE), beacon, chain_config, sync_config, @@ -416,16 +472,60 @@ where /// state for a given tipset is guaranteed not to be computed twice. #[instrument(skip(self))] pub async fn tipset_state(self: &Arc, tipset: &Arc) -> anyhow::Result { + let StateOutput { + state_root, + receipt_root, + .. + } = self.tipset_state_output(tipset).await?; + Ok((state_root, receipt_root)) + } + + #[instrument(skip(self))] + pub async fn tipset_state_output( + self: &Arc, + tipset: &Arc, + ) -> anyhow::Result { let key = tipset.key(); self.cache .get_or_else(key, || async move { let ts_state = self - .compute_tipset_state(Arc::clone(tipset), NO_CALLBACK, VMTrace::NotTraced) - .await?; + .compute_tipset_state( + Arc::clone(tipset), + NO_CALLBACK, + VMTrace::NotTraced, + VMEvent::NotPushed, + ) + .await? + .into(); trace!("Completed tipset state calculation {:?}", tipset.cids()); Ok(ts_state) }) .await + .map(StateOutput::from) + } + + #[instrument(skip(self))] + pub async fn tipset_state_events( + self: &Arc, + tipset: &Arc, + ) -> anyhow::Result { + let key = tipset.key(); + self.events_cache + .get_or_else(key, || async move { + let ts_state = self + .compute_tipset_state( + Arc::clone(tipset), + NO_CALLBACK, + VMTrace::NotTraced, + VMEvent::Pushed, + ) + .await?; + trace!("Completed tipset state calculation {:?}", tipset.cids()); + Ok(StateEvents { + events: ts_state.events, + }) + }) + .await } #[instrument(skip(self, rand))] @@ -620,7 +720,12 @@ where _ => Ok(()), // ignored } }; - let result = self.compute_tipset_state_blocking(ts, Some(callback), VMTrace::Traced); + let result = self.compute_tipset_state_blocking( + ts, + Some(callback), + VMTrace::Traced, + VMEvent::NotPushed, + ); if let Err(error_message) = result { if error_message.to_string() != REPLAY_HALT { return Err(Error::Other(format!( @@ -711,10 +816,16 @@ where tipset: Arc, callback: Option) -> anyhow::Result<()> + Send + 'static>, enable_tracing: VMTrace, - ) -> Result { + enable_event_pushing: VMEvent, + ) -> Result { let this = Arc::clone(self); tokio::task::spawn_blocking(move || { - this.compute_tipset_state_blocking(tipset, callback, enable_tracing) + this.compute_tipset_state_blocking( + tipset, + callback, + enable_tracing, + enable_event_pushing, + ) }) .await? } @@ -726,7 +837,8 @@ where tipset: Arc, callback: Option) -> anyhow::Result<()>>, enable_tracing: VMTrace, - ) -> Result { + enable_event_pushing: VMEvent, + ) -> Result { Ok(apply_block_messages( self.chain_store().genesis_block_header().timestamp, Arc::clone(&self.chain_store().chain_index), @@ -736,6 +848,7 @@ where tipset, callback, enable_tracing, + enable_event_pushing, )?) } @@ -1466,7 +1579,11 @@ where .par_bridge() .try_for_each(|(child, parent)| { info!(height = parent.epoch(), "compute parent state"); - let (actual_state, actual_receipt) = apply_block_messages( + let StateOutput { + state_root: actual_state, + receipt_root: actual_receipt, + .. + } = apply_block_messages( genesis_timestamp, chain_index.clone(), chain_config.clone(), @@ -1475,6 +1592,7 @@ where parent, NO_CALLBACK, VMTrace::NotTraced, + VMEvent::NotPushed, ) .context("couldn't compute tipset state")?; let expected_receipt = child.min_ticket_block().message_receipts; @@ -1582,7 +1700,8 @@ pub fn apply_block_messages( tipset: Arc, mut callback: Option) -> anyhow::Result<()>>, enable_tracing: VMTrace, -) -> Result + enable_event_pushing: VMEvent, +) -> Result where DB: Blockstore + Send + Sync + 'static, { @@ -1600,7 +1719,11 @@ where // magical genesis miner, this won't work properly, so we short circuit here // This avoids the question of 'who gets paid the genesis block reward' let message_receipts = tipset.min_ticket_block().message_receipts; - return Ok((*tipset.parent_state(), message_receipts)); + return Ok(StateOutput { + state_root: *tipset.parent_state(), + receipt_root: message_receipts, + events: vec![], + }); } let _timer = metrics::APPLY_BLOCKS_TIME.start_timer(); @@ -1668,16 +1791,21 @@ where // FVM requires a stack size of 64MiB. The alternative is to use `ThreadedExecutor` from // FVM, but that introduces some constraints, and possible deadlocks. - stacker::grow(64 << 20, || -> anyhow::Result<(Cid, Cid)> { + stacker::grow(64 << 20, || -> anyhow::Result { let mut vm = create_vm(parent_state, epoch, tipset.min_timestamp())?; // step 4: apply tipset messages - let receipts = vm.apply_block_messages(&block_messages, epoch, callback)?; + let (receipts, events) = + vm.apply_block_messages(&block_messages, epoch, callback, enable_event_pushing)?; // step 5: construct receipt root from receipts and flush the state-tree let receipt_root = Amt::new_from_iter(&chain_index.db, receipts)?; let state_root = vm.flush()?; - Ok((state_root, receipt_root)) + Ok(StateOutput { + state_root, + receipt_root, + events, + }) }) } diff --git a/src/test_utils/mod.rs b/src/test_utils/mod.rs index 9f6c974750c2..acd7587c0e93 100644 --- a/src/test_utils/mod.rs +++ b/src/test_utils/mod.rs @@ -35,6 +35,19 @@ pub fn construct_messages() -> (Message, SignedMessage) { (bls_messages, secp_messages) } +/// Returns a tuple of unsigned and BLS-signed messages used for testing +pub fn construct_bls_messages() -> (Message, SignedMessage) { + let message: Message = Message_v3 { + to: Address::new_id(1).into(), + from: Address::new_id(2).into(), + ..Message_v3::default() + } + .into(); + + let bls_message = SignedMessage::new_unchecked(message.clone(), Signature::new_bls(vec![0])); + (message, bls_message) +} + /// Returns a tuple of unsigned and signed messages used for testing the Ethereum mapping pub fn construct_eth_messages(sequence: u64) -> (Message, SignedMessage) { let mut bls_message: Message = Message_v3 { diff --git a/src/tool/subcommands/api_cmd.rs b/src/tool/subcommands/api_cmd.rs index b23743d3890f..0d991a4d41de 100644 --- a/src/tool/subcommands/api_cmd.rs +++ b/src/tool/subcommands/api_cmd.rs @@ -1343,7 +1343,7 @@ fn eth_tests() -> Vec { "0xff38c072f286e3b20b3954ca9f99c05fbecc64aa", ) .unwrap()], - topics: EthTopicSpec(vec![]), + topics: None, block_hash: None, },), use_alias, @@ -1522,6 +1522,16 @@ fn eth_tests_with_tipset(store: &Arc, shared_tipset: &Tipset )) .unwrap(), ), + RpcTest::identity( + EthGetLogs::request((EthFilterSpec { + from_block: Some(format!("0x{:x}", shared_tipset.epoch())), + to_block: Some(format!("0x{:x}", shared_tipset.epoch())), + address: vec![], + topics: None, + block_hash: None, + },)) + .unwrap(), + ), RpcTest::identity(EthGetTransactionHashByCid::request((block_cid,)).unwrap()), ]; diff --git a/src/tool/subcommands/archive_cmd.rs b/src/tool/subcommands/archive_cmd.rs index b5c873b3b227..ce081d850a58 100644 --- a/src/tool/subcommands/archive_cmd.rs +++ b/src/tool/subcommands/archive_cmd.rs @@ -35,14 +35,14 @@ use crate::cid_collections::CidHashSet; use crate::cli_shared::{snapshot, snapshot::TrustedVendor}; use crate::db::car::ManyCar; use crate::db::car::{AnyCar, RandomAccessFileReader}; -use crate::interpreter::VMTrace; +use crate::interpreter::{VMEvent, VMTrace}; use crate::ipld::{stream_graph, unordered_stream_graph}; use crate::networks::{butterflynet, calibnet, mainnet, ChainConfig, NetworkChain}; use crate::shim::address::CurrentNetwork; use crate::shim::clock::{ChainEpoch, EPOCHS_IN_DAY, EPOCH_DURATION_SECONDS}; use crate::shim::fvm_shared_latest::address::Network; use crate::shim::machine::MultiEngine; -use crate::state_manager::{apply_block_messages, NO_CALLBACK}; +use crate::state_manager::{apply_block_messages, StateOutput, NO_CALLBACK}; use anyhow::{bail, Context as _}; use chrono::DateTime; use cid::Cid; @@ -546,7 +546,7 @@ async fn show_tipset_diff( ResolveNullTipset::TakeNewer, )?; - let (state_root, _) = apply_block_messages( + let StateOutput { state_root, .. } = apply_block_messages( timestamp, Arc::new(chain_index), Arc::new(chain_config), @@ -555,6 +555,7 @@ async fn show_tipset_diff( tipset, NO_CALLBACK, VMTrace::NotTraced, + VMEvent::NotPushed, )?; if child_tipset.parent_state() != &state_root { diff --git a/src/tool/subcommands/snapshot_cmd.rs b/src/tool/subcommands/snapshot_cmd.rs index bd4956274943..b9a096bebef8 100644 --- a/src/tool/subcommands/snapshot_cmd.rs +++ b/src/tool/subcommands/snapshot_cmd.rs @@ -8,14 +8,14 @@ use crate::cli_shared::snapshot; use crate::daemon::bundle::load_actor_bundles; use crate::db::car::forest::DEFAULT_FOREST_CAR_FRAME_SIZE; use crate::db::car::{AnyCar, ManyCar}; -use crate::interpreter::{MessageCallbackCtx, VMTrace}; +use crate::interpreter::{MessageCallbackCtx, VMEvent, VMTrace}; use crate::ipld::stream_chain; use crate::networks::{butterflynet, calibnet, mainnet, ChainConfig, NetworkChain}; use crate::shim::address::CurrentNetwork; use crate::shim::clock::ChainEpoch; use crate::shim::fvm_shared_latest::address::Network; use crate::shim::machine::MultiEngine; -use crate::state_manager::apply_block_messages; +use crate::state_manager::{apply_block_messages, StateOutput}; use crate::utils::db::car_stream::CarStream; use crate::utils::proofs_api::ensure_params_downloaded; use anyhow::{bail, Context as _}; @@ -481,7 +481,7 @@ fn print_computed_state(snapshot: PathBuf, epoch: ChainEpoch, json: bool) -> any let mut message_calls = vec![]; - let (state_root, _) = apply_block_messages( + let StateOutput { state_root, .. } = apply_block_messages( timestamp, Arc::new(chain_index), Arc::new(chain_config), @@ -505,6 +505,7 @@ fn print_computed_state(snapshot: PathBuf, epoch: ChainEpoch, json: bool) -> any true => VMTrace::Traced, false => VMTrace::NotTraced, }, // enable traces if json flag is used + VMEvent::NotPushed, )?; if json {