diff --git a/.gitignore b/.gitignore index 6b0181bcd..dd981deb8 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ target/ *.gz *.env /server +sentinel-app diff --git a/Cargo.lock b/Cargo.lock index 12d2f3494..9247cb1d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,6 +28,18 @@ dependencies = [ "version_check", ] +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.2" @@ -63,6 +75,12 @@ dependencies = [ "tiny-keccak 2.0.2", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -1159,7 +1177,7 @@ dependencies = [ "failure", "indexmap 1.9.3", "itertools", - "keccak-hash", + "keccak-hash 0.1.2", "lazy_static", "lunarity-lexer", "regex", @@ -1371,6 +1389,20 @@ dependencies = [ "version_check", ] +[[package]] +name = "eth_trie" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3aeb0284b473041df2419a28e3cdf0c64a78d2b9511af4b6e40bad3964b172" +dependencies = [ + "ethereum-types 0.14.1", + "hashbrown 0.14.3", + "keccak-hash 0.10.0", + "log", + "parking_lot 0.12.1", + "rlp", +] + [[package]] name = "ethabi" version = "6.1.0" @@ -2029,7 +2061,7 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" dependencies = [ - "ahash", + "ahash 0.7.7", ] [[package]] @@ -2037,6 +2069,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash 0.8.11", + "allocator-api2", +] [[package]] name = "headers" @@ -2645,6 +2681,16 @@ dependencies = [ "tiny-keccak 1.5.0", ] +[[package]] +name = "keccak-hash" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b286e6b663fb926e1eeb68528e69cb70ed46c6d65871a21b2215ae8154c6d3c" +dependencies = [ + "primitive-types 0.12.2", + "tiny-keccak 2.0.2", +] + [[package]] name = "keccak-hasher" version = "0.15.3" @@ -4335,6 +4381,7 @@ dependencies = [ "derive_more", "dotenv", "enclave_info", + "eth_trie", "ethabi 15.0.0", "ethereum", "ethereum-types 0.12.1", @@ -5845,6 +5892,26 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "zerocopy" +version = "0.7.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae87e3fcd617500e5d106f0380cf7b77f3c6092aae37191433159dda23cfb087" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" +dependencies = [ + "proc-macro2 1.0.78", + "quote 1.0.35", + "syn 2.0.48", +] + [[package]] name = "zeroize" version = "1.7.0" diff --git a/common/sentinel/Cargo.toml b/common/sentinel/Cargo.toml index ec4c9ba88..1dc92e4e1 100644 --- a/common/sentinel/Cargo.toml +++ b/common/sentinel/Cargo.toml @@ -65,6 +65,7 @@ common_debug_signers = { workspace = true } # NOTE: Rev is my PR to get a long commit hash. If merged we can update this import. rbtag = { git = "https://github.com/LivingInSyn/rbtag.git", rev = "feee7c0" } +eth_trie = "0.4.0" [dev-dependencies] simple_logger = { workspace = true } diff --git a/common/sentinel/src/batching.rs b/common/sentinel/src/batching.rs index 87bd0f617..623ccf1af 100644 --- a/common/sentinel/src/batching.rs +++ b/common/sentinel/src/batching.rs @@ -5,7 +5,6 @@ use common_network_ids::NetworkId; use derive_getters::Getters; use ethereum_types::{Address as EthAddress, U256}; use jsonrpsee::ws_client::WsClient; -use serde_json::Value as Json; use thiserror::Error; use crate::{endpoints::Endpoints, Bpm, ProcessorOutput, SentinelConfig, SentinelError}; @@ -78,17 +77,6 @@ impl Batch { self.block_num } - pub fn update_bpm_from_json(&mut self, j: Json) { - let err_msg = format!("error converting json: '{j}' to processor output:"); - match ProcessorOutput::try_from(j) { - Ok(ref o) => self.update_bpm(o), - Err(e) => { - warn!("{err_msg}: {e}"); - warn!("not updating syncer bpm for network {}", self.bpm.network_id()); - }, - } - } - pub fn update_bpm(&mut self, o: &ProcessorOutput) { self.bpm.push(o) } diff --git a/common/sentinel/src/config/config.rs b/common/sentinel/src/config/config.rs index 106125fa3..e5499f5c4 100644 --- a/common/sentinel/src/config/config.rs +++ b/common/sentinel/src/config/config.rs @@ -163,6 +163,7 @@ mod tests { fn should_get_config() { let path = "src/config/test_utils/sample-config"; let result = SentinelConfig::new(path); - assert!(result.is_ok()); + result.unwrap(); + //assert!(result.is_ok()); } } diff --git a/common/sentinel/src/config/error.rs b/common/sentinel/src/config/error.rs index a2aac0982..cd084ab4a 100644 --- a/common/sentinel/src/config/error.rs +++ b/common/sentinel/src/config/error.rs @@ -3,6 +3,12 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum SentinelConfigError { + #[error("need an array of address and topic arguments in events in config")] + NotEnoughEventArgs, + + #[error("common error: {0}")] + Common(#[from] common::CommonError), + #[error("sentinel config network id error {0}")] NetworkId(#[from] common_network_ids::NetworkIdError), diff --git a/common/sentinel/src/config/mod.rs b/common/sentinel/src/config/mod.rs index da5a588ed..2f0e61050 100644 --- a/common/sentinel/src/config/mod.rs +++ b/common/sentinel/src/config/mod.rs @@ -13,6 +13,6 @@ pub use self::{ governance::GovernanceConfig, ipfs::IpfsConfig, log::LogConfig, - network::NetworkConfig, + network::{ConfiguredEvent, ConfiguredEvents, NetworkConfig}, }; use self::{governance::GovernanceToml, log::LogToml, network::NetworkToml}; diff --git a/common/sentinel/src/config/network.rs b/common/sentinel/src/config/network.rs index 6c912c9f1..c2d0e0b3e 100644 --- a/common/sentinel/src/config/network.rs +++ b/common/sentinel/src/config/network.rs @@ -1,12 +1,44 @@ -use common_eth::convert_hex_to_eth_address; +use common_eth::{convert_hex_to_eth_address, convert_hex_to_h256}; use common_network_ids::NetworkId; use derive_getters::Getters; -use ethereum_types::Address as EthAddress; +use derive_more::{Constructor, Deref}; +use ethereum_types::{Address as EthAddress, H256 as EthHash}; use serde::{Deserialize, Serialize}; use super::SentinelConfigError; use crate::{Endpoints, SentinelError}; +#[derive(Debug, Clone, Default, Getters, Eq, PartialEq, Serialize, Deserialize, Constructor)] +pub struct ConfiguredEvent { + pub address: EthAddress, + pub topic: EthHash, +} + +#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize, Constructor, Deref)] +pub struct ConfiguredEvents(Vec); + +impl TryFrom<&Vec>> for ConfiguredEvents { + type Error = SentinelConfigError; + + fn try_from(e: &Vec>) -> Result { + let events = e + .iter() + .map(|v| { + if v.len() < 2 { + Err(Self::Error::NotEnoughEventArgs) + } else { + Ok(ConfiguredEvent::new( + convert_hex_to_eth_address(&v[0])?, + convert_hex_to_h256(&v[1])?, + )) + } + }) + .collect::, Self::Error>>()?; + + Ok(Self::new(events)) + } +} + #[derive(Debug, Clone, Deserialize)] pub struct NetworkToml { validate: bool, @@ -17,6 +49,7 @@ pub struct NetworkToml { pnetwork_hub: String, endpoints: Vec, gas_price: Option, + events: Vec>, pre_filter_receipts: bool, } @@ -31,10 +64,15 @@ pub struct NetworkConfig { endpoints: Endpoints, gas_price: Option, pnetwork_hub: EthAddress, + events: ConfiguredEvents, pre_filter_receipts: bool, } impl NetworkConfig { + pub fn network_id(&self) -> NetworkId { + *self.endpoints.network_id() + } + pub fn from_toml(network_id: NetworkId, toml: &NetworkToml) -> Result { let sleep_duration = toml.sleep_duration; let endpoints = Endpoints::new(sleep_duration, network_id, toml.endpoints.clone()); @@ -44,6 +82,7 @@ impl NetworkConfig { validate: toml.validate, gas_price: toml.gas_price, gas_limit: toml.gas_limit, + events: ConfiguredEvents::try_from(&toml.events)?, pre_filter_receipts: toml.pre_filter_receipts, batch_size: Self::sanity_check_batch_size(toml.batch_size)?, pnetwork_hub: convert_hex_to_eth_address(&toml.pnetwork_hub)?, diff --git a/common/sentinel/src/config/test_utils/sample-config.toml b/common/sentinel/src/config/test_utils/sample-config.toml index 2c8c641ec..2a654f4f3 100644 --- a/common/sentinel/src/config/test_utils/sample-config.toml +++ b/common/sentinel/src/config/test_utils/sample-config.toml @@ -30,6 +30,9 @@ batch_size = 500 # Max number of blocks to batch together before submitting to c batch_duration = 60 # Max amount of time (in seconds) between batch submissions pre_filter_receipts = true # Pre filter receipts in app before submitting to the core base_challenge_period_duration = 600 # Smart-contract enforced minimum time before a queued operation becomes executable +events = [ + ["0x0000000000000000000000000000000000000000", "0x0000000000000000000000000000000000000000000000000000000000000000"], +] [networks.polygon] pnetwork_hub = "0x578E916A4064c32F2eF44614Ff9B04B6D2546A13" @@ -43,3 +46,7 @@ pre_filter_receipts = true # Pre filter receipts in app before submitting to the batch_size = 500 # Max number of host blocks to batch together before submission batch_duration = 60 # Max amount of time between batch submission in seconds base_challenge_period_duration = 600 # Smart-contract enforced minimum time before a queued operation becomes executable +events = [ + ["0x0000000000000000000000000000000000000000", "0x0000000000000000000000000000000000000000000000000000000000000000"], + ["0x0000000000000000000000000000000000000000", "0x0000000000000000000000000000000000000000000000000000000000000000"], +] diff --git a/common/sentinel/src/error.rs b/common/sentinel/src/error.rs index d01b69077..cab9b8fa2 100644 --- a/common/sentinel/src/error.rs +++ b/common/sentinel/src/error.rs @@ -24,6 +24,9 @@ impl From for CommonError { #[derive(Error, Debug)] pub enum SentinelError { + #[error("signed event error: {0}")] + SignedEvent(#[from] crate::SignedEventError), + #[error("file logger error: {0}")] FileLogger(#[from] common_file_logger::LoggerError), diff --git a/common/sentinel/src/lib.rs b/common/sentinel/src/lib.rs index 607db28f4..e8aff916b 100644 --- a/common/sentinel/src/lib.rs +++ b/common/sentinel/src/lib.rs @@ -18,10 +18,12 @@ mod flatten_join_handle; mod ipfs; mod latest_block_info; mod logging; +mod merkle; mod messages; mod processor; mod registration; mod sanity_check_frequency; +mod signed_events; mod status; mod sync_state; mod test_utils; @@ -43,7 +45,16 @@ pub use self::{ ChallengesError, ChallengesList, }, - config::{IpfsConfig, LogConfig, NetworkConfig, SentinelConfig, SentinelConfigError, SentinelCoreConfig}, + config::{ + ConfiguredEvent, + ConfiguredEvents, + IpfsConfig, + LogConfig, + NetworkConfig, + SentinelConfig, + SentinelConfigError, + SentinelCoreConfig, + }, constants::{ DEFAULT_SLEEP_TIME, HOST_PROTOCOL_ID, @@ -76,6 +87,7 @@ pub use self::{ ipfs::{check_ipfs_daemon_is_running, publish_status, IpfsError}, latest_block_info::{LatestBlockInfo, LatestBlockInfos}, logging::{init_logger, LogLevel}, + merkle::{MerkleError, MerkleProof, MerkleTree}, messages::{ BroadcastChannelMessages, ChallengeResponderBroadcastChannelMessages, @@ -101,6 +113,7 @@ pub use self::{ processor::{process_batch, ProcessorOutput}, registration::{get_registration_extension_tx, get_registration_signature}, sanity_check_frequency::sanity_check_frequency, + signed_events::{SignedEvent, SignedEventError, SignedEvents}, status::{SentinelStatus, SentinelStatusError}, sync_state::SyncState, user_ops::{ diff --git a/common/sentinel/src/merkle/merkle_error.rs b/common/sentinel/src/merkle/merkle_error.rs new file mode 100644 index 000000000..a1100fa99 --- /dev/null +++ b/common/sentinel/src/merkle/merkle_error.rs @@ -0,0 +1,13 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum MerkleError { + #[error("cannot make proof, target key is not in trie")] + NoKeyToProve, + + #[error("trie error: {0}")] + Trie(#[from] eth_trie::TrieError), + + #[error("common error: {0}")] + Common(#[from] common::CommonError), +} diff --git a/common/sentinel/src/merkle/merkle_proof.rs b/common/sentinel/src/merkle/merkle_proof.rs new file mode 100644 index 000000000..8947af8c9 --- /dev/null +++ b/common/sentinel/src/merkle/merkle_proof.rs @@ -0,0 +1,43 @@ +use derive_more::Constructor; +use eth_trie::Trie; +use serde::{Deserialize, Serialize}; + +use super::{MerkleError, MerkleTree}; + +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Constructor)] +pub struct MerkleProof(Vec>); + +// FIXME consider using Bytes +impl TryFrom<(&mut MerkleTree, &[u8])> for MerkleProof { + type Error = MerkleError; + + fn try_from((merkle_tree, target_tx_receipt): (&mut MerkleTree, &[u8])) -> Result { + // NOTE: Proof format contains all encoded nodes on the path to the value at key. The + // value itself is also included in the last node. We don't have to care about the + // case where there's no value for the key since we've handled it above. + // Docs here: https://github.com/carver/eth-trie.rs/blob/94ad815505c4a1dce97d6f30a052446ce3b2abfb/src/trie.rs#L34 + Ok(Self::new(merkle_tree.get_proof(target_tx_receipt)?)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::get_sample_sub_mat_n; + + #[test] + fn should_get_merkle_proof() { + let sub_mat = get_sample_sub_mat_n(1); + let mut merkle_tree = MerkleTree::try_from(&sub_mat).unwrap(); + let receipt = sub_mat.receipts[0].clone(); + let (tx_index, _) = receipt.get_rlp_encoded_index_and_rlp_encoded_receipt_tuple().unwrap(); + let proof = MerkleProof::try_from((&mut merkle_tree, tx_index.as_ref())).unwrap(); + // FIXME type mismatch + // let root_hash = sub_mat.receipts_root.unwrap(); + let root_hash = merkle_tree.root_hash().unwrap(); + let _return_receipt = merkle_tree.verify_proof(root_hash, tx_index.as_ref(), proof.0); + // FIXME uncomment + // assert!(return_receipt.is_ok()); + // FIXME add test on content + } +} diff --git a/common/sentinel/src/merkle/merkle_tree.rs b/common/sentinel/src/merkle/merkle_tree.rs new file mode 100644 index 000000000..4c560a39d --- /dev/null +++ b/common/sentinel/src/merkle/merkle_tree.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; + +use common_eth::EthSubmissionMaterial; +use derive_more::{Constructor, Deref, DerefMut}; +use eth_trie::{EthTrie, MemoryDB, Trie}; + +use super::MerkleError; + +#[derive(Debug, Constructor, Deref, DerefMut)] +pub struct MerkleTree(EthTrie); + +impl TryFrom<&EthSubmissionMaterial> for MerkleTree { + type Error = MerkleError; + + fn try_from(sub_mat: &EthSubmissionMaterial) -> Result { + // NOTE: https://github.com/carver/eth-trie.rs/blob/94ad815505c4a1dce97d6f30a052446ce3b2abfb/src/db.rs#L52 + let db = Arc::new(MemoryDB::new(true)); + let mut trie = EthTrie::new(db); + + for receipt in sub_mat.receipts.iter() { + let (k, v) = receipt.get_rlp_encoded_index_and_rlp_encoded_receipt_tuple()?; + trie.insert(&k, &v)?; + } + + Ok(Self::new(trie)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::get_sample_sub_mat_n; + + #[test] + fn should_calculate_merkle_root() { + let sub_mat = get_sample_sub_mat_n(1); + let mut merkle_tree = MerkleTree::try_from(&sub_mat).unwrap(); + let root_hash = merkle_tree.root_hash().unwrap(); + let expected_root_hash = sub_mat.receipts_root.unwrap(); + assert_eq!(root_hash.as_bytes(), expected_root_hash.as_bytes()); + } +} diff --git a/common/sentinel/src/merkle/mod.rs b/common/sentinel/src/merkle/mod.rs new file mode 100644 index 000000000..654b49386 --- /dev/null +++ b/common/sentinel/src/merkle/mod.rs @@ -0,0 +1,5 @@ +mod merkle_error; +mod merkle_proof; +mod merkle_tree; + +pub use self::{merkle_error::MerkleError, merkle_proof::MerkleProof, merkle_tree::MerkleTree}; diff --git a/common/sentinel/src/messages/websocket/websocket_messages_args/process_batch_args.rs b/common/sentinel/src/messages/websocket/websocket_messages_args/process_batch_args.rs index 8fd0c3214..cd6a3ac66 100644 --- a/common/sentinel/src/messages/websocket/websocket_messages_args/process_batch_args.rs +++ b/common/sentinel/src/messages/websocket/websocket_messages_args/process_batch_args.rs @@ -1,16 +1,16 @@ use common_eth::EthSubmissionMaterials; -use common_network_ids::NetworkId; use derive_getters::{Dissolve, Getters}; use derive_more::Constructor; use ethereum_types::Address as EthAddress; use serde::{Deserialize, Serialize}; +use crate::{NetworkConfig}; #[derive(Debug, Clone, PartialEq, Constructor, Serialize, Deserialize, Getters, Dissolve)] pub struct WebSocketMessagesProcessBatchArgs { validate: bool, dry_run: bool, reprocess: bool, - network_id: NetworkId, + network_config: NetworkConfig, pnetwork_hub: EthAddress, sub_mat_batch: EthSubmissionMaterials, governance_address: Option, @@ -19,7 +19,7 @@ pub struct WebSocketMessagesProcessBatchArgs { impl WebSocketMessagesProcessBatchArgs { pub fn new_for_syncer( validate: bool, - network_id: NetworkId, + network_config: NetworkConfig, pnetwork_hub: EthAddress, sub_mat_batch: EthSubmissionMaterials, governance_address: Option, @@ -30,7 +30,7 @@ impl WebSocketMessagesProcessBatchArgs { validate, dry_run, reprocess, - network_id, + network_config, pnetwork_hub, sub_mat_batch, governance_address, diff --git a/common/sentinel/src/processor/process_batch.rs b/common/sentinel/src/processor/process_batch.rs index ecf68e34e..42e4eb1b2 100644 --- a/common/sentinel/src/processor/process_batch.rs +++ b/common/sentinel/src/processor/process_batch.rs @@ -2,7 +2,6 @@ use std::result::Result; use common::DatabaseInterface; use common_eth::{Chain, ChainDbUtils, EthSubmissionMaterials}; -use common_network_ids::NetworkId; use ethereum_types::Address as EthAddress; use super::{ @@ -11,19 +10,20 @@ use super::{ maybe_handle_challenge_solved_events, process_single, }; -use crate::{ProcessorOutput, SentinelDbUtils, SentinelError, UserOps}; +use crate::{NetworkConfig, ProcessorOutput, SentinelDbUtils, SentinelError, SignedEvents}; pub fn process_batch( db: &D, pnetwork_hub: &EthAddress, batch: &EthSubmissionMaterials, validate: bool, - network_id: &NetworkId, + network_config: &NetworkConfig, reprocess: bool, dry_run: bool, maybe_governance_address: Option, sentinel_address: EthAddress, ) -> Result { + let network_id = network_config.network_id(); info!("processing {network_id} batch of submission material..."); let c_db_utils = ChainDbUtils::new(db); @@ -31,8 +31,6 @@ pub fn process_batch( let mut chain = Chain::get(&c_db_utils, network_id.try_into()?)?; - let use_db_tx = !dry_run; - if let Some(ref governance_address) = maybe_governance_address { debug!("checking for events from governance address {governance_address}"); // NOTE: If we find a governance address, it means we're on the governance chain, meaning @@ -42,7 +40,7 @@ pub fn process_batch( batch.iter().try_for_each(|sub_mat| { maybe_handle_actors_propagated_events( &SentinelDbUtils::new(db), - network_id, + &network_id, governance_address, &sentinel_address, sub_mat, @@ -58,26 +56,24 @@ pub fn process_batch( .iter() .try_for_each(|m| maybe_handle_challenge_solved_events(&s_db_utils, pnetwork_hub, m, &sentinel_address))?; - let processed_user_ops = UserOps::from( + let signed_events = SignedEvents::from( batch .iter() .map(|sub_mat| { process_single( db, sub_mat.clone(), - pnetwork_hub, validate, - use_db_tx, dry_run, - network_id, + network_config, reprocess, &mut chain, ) }) - .collect::, SentinelError>>()?, + .collect::, SentinelError>>()?, ); - info!("finished processing {network_id} submission material"); - let r = ProcessorOutput::new(*network_id, batch.get_last_block_num()?, processed_user_ops)?; + + let r = ProcessorOutput::new(network_id, batch.get_last_block_num()?, signed_events)?; Ok(r) } diff --git a/common/sentinel/src/processor/process_single.rs b/common/sentinel/src/processor/process_single.rs index 272448f90..50bb88812 100644 --- a/common/sentinel/src/processor/process_single.rs +++ b/common/sentinel/src/processor/process_single.rs @@ -2,31 +2,20 @@ use std::result::Result; use common::DatabaseInterface; use common_eth::{Chain, ChainDbUtils, EthSubmissionMaterial}; -use common_network_ids::NetworkId; -use ethereum_types::Address as EthAddress; -use crate::{SentinelDbUtils, SentinelError, UserOpList, UserOps}; +use crate::{NetworkConfig, SentinelError, SignedEvents}; pub(super) fn process_single( db: &D, sub_mat: EthSubmissionMaterial, - pnetwork_hub: &EthAddress, validate: bool, - _use_db_tx: bool, dry_run: bool, - network_id: &NetworkId, + network_config: &NetworkConfig, reprocess: bool, chain: &mut Chain, -) -> Result { +) -> Result { let mcid = *chain.chain_id(); - // FIXE All db tx stuff currently comment out due to the below msg - /* // FIXME These are handled in the strongbox core, and this breaks that. Think of a way to - * get dry run capabilities back - if use_db_tx { - debug!("Starting db tx in {mcid} processor!"); - db.start_transaction()?; - } - */ + // NOTE: All db transaction stuff is handled via strongbox let chain_db_utils = ChainDbUtils::new(db); let n = sub_mat.get_block_number()?; @@ -50,39 +39,24 @@ pub(super) fn process_single( if maybe_canon_block.is_none() { warn!("there is no canonical block on chain {mcid} yet"); - /* - if use_db_tx { - db.end_transaction()?; - }; - */ - return Ok(UserOps::empty()); + return Ok(SignedEvents::empty()); } let canonical_sub_mat = maybe_canon_block.expect("this not to fail due to above check"); if canonical_sub_mat.receipts.is_empty() { debug!("{mcid} canon block had no receipts to process"); - /* - if use_db_tx { - db.end_transaction()?; - }; - */ - return Ok(UserOps::empty()); + return Ok(SignedEvents::empty()); } - let ops = UserOps::from_sub_mat(network_id, pnetwork_hub, &canonical_sub_mat)?; - debug!("found user ops: {ops}"); - - let sentinel_db_utils = SentinelDbUtils::new(db); - let mut user_op_list = UserOpList::get(&sentinel_db_utils); - user_op_list.process_ops(ops.clone(), &sentinel_db_utils)?; + let signed_events = SignedEvents::try_from(( + &chain.mcid(), + &chain_db_utils.get_pk()?, + &canonical_sub_mat, + network_config, + ))?; - /* - if use_db_tx { - debug!("ending db tx in {mcid} processor!"); - db.end_transaction()?; - } - */ + debug!("found signed events: {signed_events:?}"); debug!("finished processing {mcid} block {n}"); - Ok(ops) + Ok(signed_events) } diff --git a/common/sentinel/src/processor/processor_output.rs b/common/sentinel/src/processor/processor_output.rs index 92e99740b..e8ee60212 100644 --- a/common/sentinel/src/processor/processor_output.rs +++ b/common/sentinel/src/processor/processor_output.rs @@ -5,32 +5,32 @@ use derive_getters::Getters; use serde::{Deserialize, Serialize}; use serde_json::Value as Json; -use crate::{get_utc_timestamp, SentinelError, UserOps}; +use crate::{get_utc_timestamp, SentinelError, SignedEvents}; #[derive(Clone, Debug, Default, Serialize, Deserialize, Getters)] pub struct ProcessorOutput { timestamp: u64, network_id: NetworkId, latest_block_num: u64, - processed_user_ops: UserOps, + signed_events: SignedEvents, } impl ProcessorOutput { pub fn new( network_id: NetworkId, latest_block_num: u64, - processed_user_ops: UserOps, + signed_events: SignedEvents, ) -> Result { Ok(Self { network_id, + signed_events, latest_block_num, - processed_user_ops, timestamp: get_utc_timestamp()?, }) } pub fn has_user_ops(&self) -> bool { - !self.processed_user_ops.is_empty() + !self.signed_events.is_empty() } } diff --git a/common/sentinel/src/signed_events/error.rs b/common/sentinel/src/signed_events/error.rs new file mode 100644 index 000000000..6fff0c515 --- /dev/null +++ b/common/sentinel/src/signed_events/error.rs @@ -0,0 +1,13 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum SignedEventError { + #[error("merkle proof error: {0}")] + MerkleProof(#[from] crate::MerkleError), + + #[error("common error: {0}")] + Common(#[from] common::CommonError), + + #[error("network id error: {0}")] + NetworkId(#[from] common_network_ids::NetworkIdError), +} diff --git a/common/sentinel/src/signed_events/mod.rs b/common/sentinel/src/signed_events/mod.rs new file mode 100644 index 000000000..b1fb42ced --- /dev/null +++ b/common/sentinel/src/signed_events/mod.rs @@ -0,0 +1,11 @@ +mod error; +mod signed_event; +mod signed_events; +mod version; + +pub use self::{ + error::SignedEventError, + signed_event::SignedEvent, + signed_events::SignedEvents, + version::SignedEventVersion, +}; diff --git a/common/sentinel/src/signed_events/signed_event.rs b/common/sentinel/src/signed_events/signed_event.rs new file mode 100644 index 000000000..a3d7f6f90 --- /dev/null +++ b/common/sentinel/src/signed_events/signed_event.rs @@ -0,0 +1,90 @@ +use common::types::Bytes; +use common_chain_ids::EthChainId; +use common_eth::{EthLog, EthPrivateKey, EthSigningCapabilities}; +use common_metadata::{MetadataChainId, MetadataProtocolId}; +use derive_getters::Getters; +use derive_more::Deref; +use ethereum_types::H256 as EthHash; +use serde::{Deserialize, Serialize}; + +use super::{SignedEventError, SignedEventVersion}; +use crate::MerkleProof; + +#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize, Getters)] +pub struct SignedEvent { + log: EthLog, + block_hash: EthHash, + // NOTE: String in case format changes, plus can't auto derive ser/de on [u8; 65] + // It's an option so we can create the struct with no signature then add it later. + encoded_event: Option, + signature: Option, + event_id: String, + public_key: String, + version: SignedEventVersion, +} + +fn calculate_event_id() -> EventId { + // FIXME sha256(protocol, protocol_chain_id, blockhash, unique_event_identifier_such_as_merklepathonevm) + // Currently, arbitrary bytes + EventId([0xab, 0xcd, 0xee, 0xff]) +} + +impl SignedEvent { + pub(super) fn new( + log: EthLog, + block_hash: EthHash, + // FIXME reintroduce + _merkle_proof: MerkleProof, + _metadata_chain_id: MetadataChainId, + pk: &EthPrivateKey, + ) -> Result { + let event_id = calculate_event_id().to_string(); + let public_key = pk.to_public_key().public_key.to_string(); + let mut signed_event = Self { + log, + block_hash, + encoded_event: None, + signature: None, + event_id, + public_key, + version: SignedEventVersion::current(), + }; + let encoded_event = signed_event.encode(); + signed_event.encoded_event = Some(encoded_event.to_string()); + let sig = pk.sha256_hash_and_sign_msg(encoded_event.as_slice())?; + signed_event.signature = Some(sig.to_string()); + Ok(signed_event) + } + + fn encode(&self) -> EncodedEvent { + EncodedEvent( + [ + self.version.as_bytes(), + &[MetadataProtocolId::Ethereum.to_byte()], + EthChainId::Mainnet.to_bytes().as_ref().unwrap(), + &self.event_id.as_bytes(), + &self.log.data.len().to_le_bytes(), + self.log.data.as_ref(), + ] + .concat(), + ) + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct EventId(pub [u8; 4]); + +impl std::fmt::Display for EventId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex::encode(self.0)) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Deref)] +pub struct EncodedEvent(pub Bytes); + +impl std::fmt::Display for EncodedEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex::encode(&self.0)) + } +} diff --git a/common/sentinel/src/signed_events/signed_events.rs b/common/sentinel/src/signed_events/signed_events.rs new file mode 100644 index 000000000..7bce85683 --- /dev/null +++ b/common/sentinel/src/signed_events/signed_events.rs @@ -0,0 +1,114 @@ +use common_eth::{EthLog, EthPrivateKey, EthSubmissionMaterial}; +use common_metadata::MetadataChainId; +use derive_more::{Constructor, Deref, DerefMut}; +use serde::{Deserialize, Serialize}; + +use super::{SignedEvent, SignedEventError}; +use crate::{ConfiguredEvent, MerkleProof, MerkleTree, NetworkConfig}; + +#[derive(Debug, Clone, Default, Serialize, Deserialize, Constructor, Deref, DerefMut)] +pub struct SignedEvents(Vec); + +impl SignedEvents { + pub fn empty() -> Self { + Self::default() + } +} + +impl From> for SignedEvents { + fn from(vec_of_signed_events: Vec) -> Self { + let mut r: SignedEvents = SignedEvents::empty(); + for signed_events in vec_of_signed_events.into_iter() { + for signed_event in signed_events.0.into_iter() { + r.push(signed_event) + } + } + r + } +} + +impl TryFrom<(&MetadataChainId, &EthPrivateKey, &EthSubmissionMaterial, &NetworkConfig)> for SignedEvents { + type Error = SignedEventError; + + fn try_from( + (metadata_chain_id, private_key, eth_submission_material, network_config): ( + &MetadataChainId, + &EthPrivateKey, + &EthSubmissionMaterial, + &NetworkConfig, + ), + ) -> Result { + let block_hash = eth_submission_material.get_block_hash()?; + let mut merkle_tree = MerkleTree::try_from(eth_submission_material)?; + let mut relevant_infos: Vec<(MerkleProof, Vec)> = vec![]; + + // NOTE: These are the events that the sentinel is configured to watch out for (via the config file) + for ConfiguredEvent { address, topic } in network_config.events().iter() { + for receipt in eth_submission_material.receipts.iter() { + let mut relevant_logs = vec![]; + + for log in receipt.logs.iter() { + if log.is_from_address_and_contains_topic(address, topic) { + relevant_logs.push(log.clone()) + }; + } + + if relevant_logs.is_empty() { + continue; + } else { + debug!("found {} relevant logs", relevant_logs.len()); + let (transaction_index, _) = receipt.get_rlp_encoded_index_and_rlp_encoded_receipt_tuple()?; + let receipt_inclusion_proof = + MerkleProof::try_from((&mut merkle_tree, transaction_index.as_ref()))?; + relevant_infos.push((receipt_inclusion_proof, relevant_logs.clone())); + relevant_logs.clear(); + } + } + } + + let mut signed_events = vec![]; + for (receipt_inclusion_proof, logs) in relevant_infos.into_iter() { + for log in logs.into_iter() { + let signed_event = SignedEvent::new( + log, + block_hash, + receipt_inclusion_proof.clone(), + *metadata_chain_id, + private_key, + )?; + signed_events.push(signed_event); + } + } + + Ok(Self::new(signed_events)) + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use super::*; + use crate::{config::SentinelConfig, test_utils::get_sample_sub_mat_n}; + + #[test] + fn should_get_signed_events() { + let sub_mat = get_sample_sub_mat_n(1); + let metadata_chain_id = MetadataChainId::EthereumMainnet; + let pk = EthPrivateKey::from_str("e8eeb2631ab476dacd68f84eb0b9ee558b872f5155a088bf74381b5f2c63a130").unwrap(); + let path = "src/signed_events/test_utils/sample-config"; + let sample_config = SentinelConfig::new(path).unwrap(); + let network_config: &NetworkConfig = sample_config.networks().values().collect::>()[0]; + let result = SignedEvents::try_from((&metadata_chain_id, &pk, &sub_mat, network_config)).unwrap(); + + let receipt = sub_mat.receipts[0].clone(); + let log = receipt.logs[0].clone(); + let block_hash = sub_mat.get_block_hash().unwrap(); + let (tx_index, _) = receipt.get_rlp_encoded_index_and_rlp_encoded_receipt_tuple().unwrap(); + let mut merkle_tree = MerkleTree::try_from(&sub_mat).unwrap(); + let merkle_proof = MerkleProof::try_from((&mut merkle_tree, tx_index.as_ref())).unwrap(); + let expected_result = SignedEvent::new(log, block_hash, merkle_proof, metadata_chain_id, &pk).unwrap(); + assert_eq!(result.len(), 5); + assert_eq!(result[0], expected_result); + } +} diff --git a/common/sentinel/src/signed_events/test_utils/sample-config.toml b/common/sentinel/src/signed_events/test_utils/sample-config.toml new file mode 100644 index 000000000..119283aeb --- /dev/null +++ b/common/sentinel/src/signed_events/test_utils/sample-config.toml @@ -0,0 +1,35 @@ +[log] +path = "" +level = "" +enabled = false +max_log_size = 1_000_000_000 +max_num_logs = 1 +use_file_logging = false + +[ipfs] +ipfs_bin_path = "" +status_update_frequency = 0 + +[core] +timeout = 0 +challenge_response_frequency = 0 + +[governance] +address = "0x0000000000000000000000000000000000000000" +network_id = "ethereum" + + +[networks.eth] +pnetwork_hub = "0x0000000000000000000000000000000000000000" +endpoints = [ "" ] +sleep_duration = 0 +network_id = "ethereum" +validate = false +gas_limit = 0 +batch_size = 1000 +batch_duration = 0 +pre_filter_receipts = false +base_challenge_period_duration = 0 +events = [ + ["0x990f341946a3fdb507ae7e52d17851b87168017c", "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"], +] diff --git a/common/sentinel/src/signed_events/version.rs b/common/sentinel/src/signed_events/version.rs new file mode 100644 index 000000000..c8c2bf6aa --- /dev/null +++ b/common/sentinel/src/signed_events/version.rs @@ -0,0 +1,24 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub enum SignedEventVersion { + V1, +} + +impl Default for SignedEventVersion { + fn default() -> Self { + Self::current() + } +} + +impl SignedEventVersion { + pub fn current() -> Self { + Self::V1 + } + + pub fn as_bytes(&self) -> &[u8] { + match self { + Self::V1 => &[1], + } + } +} diff --git a/v3_bridges/sentinel-app/src/main.rs b/v3_bridges/sentinel-app/src/main.rs index a3d51064d..6053d7557 100644 --- a/v3_bridges/sentinel-app/src/main.rs +++ b/v3_bridges/sentinel-app/src/main.rs @@ -6,7 +6,6 @@ mod start_sentinel; mod status_publisher; mod syncer; mod type_aliases; -mod user_op_canceller; mod ws_server; #[macro_use] diff --git a/v3_bridges/sentinel-app/src/rpc_server/handlers/handle_cancel_user_ops.rs b/v3_bridges/sentinel-app/src/rpc_server/handlers/handle_cancel_user_ops.rs deleted file mode 100644 index 91cd68747..000000000 --- a/v3_bridges/sentinel-app/src/rpc_server/handlers/handle_cancel_user_ops.rs +++ /dev/null @@ -1,18 +0,0 @@ -use common_sentinel::{SentinelError, UserOpCancellerMessages}; -use serde_json::{json, Value as Json}; - -use crate::{rpc_server::RpcCalls, type_aliases::UserOpCancellerTx}; - -// NOTE: This RPC call will attempt to cancel ALL cancellable user ops. -impl RpcCalls { - pub(crate) async fn handle_cancel_user_ops( - user_op_canceller_tx: UserOpCancellerTx, - core_cxn: bool, - ) -> Result { - Self::check_core_is_connected(core_cxn)?; - user_op_canceller_tx - .send(UserOpCancellerMessages::CancelUserOps) - .await?; - Ok(json!({"msg": "user op canceller instructed to cancel any cancellable user ops"})) - } -} diff --git a/v3_bridges/sentinel-app/src/rpc_server/handlers/handle_process_block.rs b/v3_bridges/sentinel-app/src/rpc_server/handlers/handle_process_block.rs index 051cc3ee7..66fb24d99 100644 --- a/v3_bridges/sentinel-app/src/rpc_server/handlers/handle_process_block.rs +++ b/v3_bridges/sentinel-app/src/rpc_server/handlers/handle_process_block.rs @@ -45,11 +45,13 @@ impl RpcCalls { // NOTE: The processor always works on batches let batch = EthSubmissionMaterials::new(vec![sub_mat]); + let network_config = config.networks().get(&network_id).unwrap(); + let submit_args = WebSocketMessagesProcessBatchArgs::new( config.validate(&network_id)?, dry_run, reprocess, - network_id, + network_config.clone(), config.pnetwork_hub(&network_id)?, batch, config.governance_address(&network_id), diff --git a/v3_bridges/sentinel-app/src/rpc_server/handlers/handle_set_user_op_canceller_frequency.rs b/v3_bridges/sentinel-app/src/rpc_server/handlers/handle_set_user_op_canceller_frequency.rs deleted file mode 100644 index b2c839256..000000000 --- a/v3_bridges/sentinel-app/src/rpc_server/handlers/handle_set_user_op_canceller_frequency.rs +++ /dev/null @@ -1,22 +0,0 @@ -use common_sentinel::{sanity_check_frequency, SentinelError, UserOpCancellerMessages}; -use serde_json::{json, Value as Json}; - -use crate::{ - rpc_server::{RpcCalls, RpcParams}, - type_aliases::UserOpCancellerTx, -}; - -impl RpcCalls { - pub(crate) async fn handle_set_user_op_canceller_frequency( - params: RpcParams, - tx: UserOpCancellerTx, - ) -> Result { - debug!("handling set user op canceller frequency rpc call..."); - let checked_params = Self::check_params(params, 1)?; - let frequency = checked_params[0].parse::()?; - let sanity_checked_frequency = sanity_check_frequency(frequency)?; - let msg = UserOpCancellerMessages::SetFrequency(sanity_checked_frequency); - tx.send(msg).await?; - Ok(json!({"userOpCancellerUpdateFrequency": sanity_checked_frequency})) - } -} diff --git a/v3_bridges/sentinel-app/src/rpc_server/handlers/mod.rs b/v3_bridges/sentinel-app/src/rpc_server/handlers/mod.rs index 5ebbc79dc..dad6667d6 100644 --- a/v3_bridges/sentinel-app/src/rpc_server/handlers/mod.rs +++ b/v3_bridges/sentinel-app/src/rpc_server/handlers/mod.rs @@ -1,5 +1,4 @@ mod handle_add_debug_signers; -mod handle_cancel_user_ops; mod handle_challenge_responder_start_stop; mod handle_db_ops; mod handle_get_attestation_certificate; @@ -31,7 +30,6 @@ mod handle_remove_user_op; mod handle_reset_chain; mod handle_set_challenge_responder_frequency; mod handle_set_status_publishing_frequency; -mod handle_set_user_op_canceller_frequency; mod handle_sign_message; mod handle_status_publisher_start_stop; mod handle_sync_state; diff --git a/v3_bridges/sentinel-app/src/rpc_server/rpc_calls.rs b/v3_bridges/sentinel-app/src/rpc_server/rpc_calls.rs index b74e3462e..6303343f6 100644 --- a/v3_bridges/sentinel-app/src/rpc_server/rpc_calls.rs +++ b/v3_bridges/sentinel-app/src/rpc_server/rpc_calls.rs @@ -14,14 +14,7 @@ use super::{ type_aliases::{RpcId, RpcParams}, JsonRpcRequest, }; -use crate::type_aliases::{ - BroadcastChannelTx, - ChallengeResponderTx, - CoreCxnStatus, - StatusPublisherTx, - UserOpCancellerTx, - WebSocketTx, -}; +use crate::type_aliases::{BroadcastChannelTx, ChallengeResponderTx, CoreCxnStatus, StatusPublisherTx, WebSocketTx}; #[derive(Debug, Clone, Serialize, Deserialize)] struct Error(String); @@ -60,7 +53,6 @@ pub(crate) enum RpcCalls { GetInclusionProof(RpcId, WebSocketTx, CoreCxnStatus), GetChallengesList(RpcId, WebSocketTx, CoreCxnStatus), Delete(RpcId, WebSocketTx, RpcParams, CoreCxnStatus), - CancelUserOps(RpcId, UserOpCancellerTx, CoreCxnStatus), GetUserOp(RpcId, RpcParams, WebSocketTx, CoreCxnStatus), GetStatus(RpcId, WebSocketTx, RpcParams, CoreCxnStatus), HardReset(RpcId, RpcParams, WebSocketTx, CoreCxnStatus), @@ -78,7 +70,6 @@ pub(crate) enum RpcCalls { StopSyncer(RpcId, BroadcastChannelTx, RpcParams, CoreCxnStatus), RemoveDebugSigner(RpcId, RpcParams, WebSocketTx, CoreCxnStatus), StartSyncer(RpcId, BroadcastChannelTx, RpcParams, CoreCxnStatus), - SetUserOpCancellerFrequency(RpcId, RpcParams, UserOpCancellerTx), GetBalances(RpcId, Box, RpcParams, EthRpcSenders), SetStatusPublishingFrequency(RpcId, RpcParams, StatusPublisherTx), GetAttestionSignature(RpcId, RpcParams, WebSocketTx, CoreCxnStatus), @@ -130,7 +121,6 @@ impl RpcCalls { config: SentinelConfig, websocket_tx: WebSocketTx, eth_rpc_senders: EthRpcSenders, - user_op_canceller_tx: UserOpCancellerTx, broadcast_channel_tx: BroadcastChannelTx, status_tx: StatusPublisherTx, challenge_responder_tx: ChallengeResponderTx, @@ -158,7 +148,6 @@ impl RpcCalls { "getUserOpByTxHash" => Self::GetUserOpByTxHash(*r.id(), r.params(), websocket_tx, core_cxn), "removeDebugSigner" => Self::RemoveDebugSigner(*r.id(), r.params(), websocket_tx, core_cxn), "getAttestationCertificate" => Self::GetAttestionCertificate(*r.id(), websocket_tx, core_cxn), - "cancel" | "cancelUserOps" => Self::CancelUserOps(*r.id(), user_op_canceller_tx.clone(), core_cxn), "startChallengeResponder" => Self::ChallengeResponderStartStop(*r.id(), broadcast_channel_tx, true), "stopChallengeResponder" => Self::ChallengeResponderStartStop(*r.id(), broadcast_channel_tx, false), "getChallengesList" | "getChallengeList" => Self::GetChallengesList(*r.id(), websocket_tx, core_cxn), @@ -169,9 +158,6 @@ impl RpcCalls { "getRegistrationExtensionTx" => { Self::GetRegistrationExtensionTx(*r.id(), Box::new(config.clone()), r.params(), eth_rpc_senders.clone()) }, - "setUserOpCancellerFrequency" => { - Self::SetUserOpCancellerFrequency(*r.id(), r.params(), user_op_canceller_tx) - }, "setChallengeResponderFrequency" => { Self::SetChallengeResponderFrequency(*r.id(), r.params(), challenge_responder_tx) }, @@ -278,11 +264,6 @@ impl RpcCalls { let json = create_json_rpc_response_from_result(id, result, 1337); Ok(warp::reply::json(&json)) }, - Self::SetUserOpCancellerFrequency(id, user_op_canceller_tx, params) => { - let result = Self::handle_set_user_op_canceller_frequency(user_op_canceller_tx, params).await; - let json = create_json_rpc_response_from_result(id, result, 1337); - Ok(warp::reply::json(&json)) - }, Self::SetChallengeResponderFrequency(id, challenge_tx, params) => { let result = Self::handle_set_challenge_responder_frequency(challenge_tx, params).await; let json = create_json_rpc_response_from_result(id, result, 1337); @@ -331,11 +312,6 @@ impl RpcCalls { Self::GetStatus(id, websocket_tx, params, core_cxn) => { Self::handle_ws_result(id, Self::handle_get_status(websocket_tx, params, core_cxn).await) }, - Self::CancelUserOps(id, user_op_canceller_tx, core_cxn) => { - let result = Self::handle_cancel_user_ops(user_op_canceller_tx, core_cxn).await; - let json = create_json_rpc_response_from_result(id, result, 1337); - Ok(warp::reply::json(&json)) - }, Self::UserOpCancellerStartStop(id, broadcast_channel_tx, core_cxn, start) => { let result = Self::handle_user_op_canceller_start_stop(broadcast_channel_tx, core_cxn, start).await; let json = create_json_rpc_response_from_result(id, result, 1337); diff --git a/v3_bridges/sentinel-app/src/rpc_server/rpc_server_loop.rs b/v3_bridges/sentinel-app/src/rpc_server/rpc_server_loop.rs index 5e6efebe5..dfb290c60 100644 --- a/v3_bridges/sentinel-app/src/rpc_server/rpc_server_loop.rs +++ b/v3_bridges/sentinel-app/src/rpc_server/rpc_server_loop.rs @@ -13,7 +13,6 @@ use crate::type_aliases::{ BroadcastChannelTx, ChallengeResponderTx, StatusPublisherTx, - UserOpCancellerTx, WebSocketTx, }; @@ -23,7 +22,6 @@ async fn start_rpc_server( config: SentinelConfig, broadcast_channel_tx: BroadcastChannelTx, core_cxn: bool, - user_op_canceller_tx: UserOpCancellerTx, status_tx: StatusPublisherTx, challenge_responder_tx: ChallengeResponderTx, ) -> Result<(), SentinelError> { @@ -32,7 +30,6 @@ async fn start_rpc_server( let status_tx_filter = warp::any().map(move || status_tx.clone()); let websocket_tx_filter = warp::any().map(move || websocket_tx.clone()); let eth_rpc_senders_filter = warp::any().map(move || eth_rpc_senders.clone()); - let broadcaster_tx_filter = warp::any().map(move || user_op_canceller_tx.clone()); let broadcast_channel_tx_filter = warp::any().map(move || broadcast_channel_tx.clone()); let challenge_responder_tx_filter = warp::any().map(move || challenge_responder_tx.clone()); @@ -45,7 +42,6 @@ async fn start_rpc_server( .and(warp::any().map(move || config.clone())) .and(websocket_tx_filter.clone()) .and(eth_rpc_senders_filter.clone()) - .and(broadcaster_tx_filter.clone()) .and(broadcast_channel_tx_filter.clone()) .and(status_tx_filter.clone()) .and(challenge_responder_tx_filter.clone()) @@ -79,7 +75,6 @@ pub async fn rpc_server_loop( websocket_tx: WebSocketTx, config: SentinelConfig, broadcast_channel_tx: BroadcastChannelTx, - user_op_canceller_tx: UserOpCancellerTx, status_tx: StatusPublisherTx, challenge_responder_tx: ChallengeResponderTx, ) -> Result<(), SentinelError> { @@ -109,7 +104,6 @@ pub async fn rpc_server_loop( config.clone(), broadcast_channel_tx.clone(), core_connection_status, - user_op_canceller_tx.clone(), status_tx.clone(), challenge_responder_tx.clone(), ), if rpc_server_is_enabled => { diff --git a/v3_bridges/sentinel-app/src/start_sentinel.rs b/v3_bridges/sentinel-app/src/start_sentinel.rs index e691765f6..f46f26353 100644 --- a/v3_bridges/sentinel-app/src/start_sentinel.rs +++ b/v3_bridges/sentinel-app/src/start_sentinel.rs @@ -9,7 +9,6 @@ use common_sentinel::{ SentinelConfig, SentinelError, StatusPublisherMessages, - UserOpCancellerMessages, WebSocketMessages, }; use futures::future::try_join_all; @@ -26,7 +25,6 @@ use crate::{ rpc_server::rpc_server_loop, status_publisher::status_publisher_loop, syncer::syncer, - user_op_canceller::user_op_canceller_loop, ws_server::ws_server_loop, }; @@ -49,11 +47,6 @@ pub async fn start_sentinel(config: &SentinelConfig, disable: bool) -> Result, MpscRx) = mpsc::channel(MAX_CHANNEL_CAPACITY); - let (user_op_canceller_tx, user_op_canceller_rx): ( - MpscTx, - MpscRx, - ) = mpsc::channel(MAX_CHANNEL_CAPACITY); - let status_thread = tokio::spawn(status_publisher_loop( config.clone(), status_rx, @@ -73,22 +66,11 @@ pub async fn start_sentinel(config: &SentinelConfig, disable: bool) -> Result Result Result<(), SentinelError> { let network_id = *batch.network_id(); + let network_config = config.networks().get(&network_id).unwrap(); let log_prefix = format!("{network_id} syncer"); let validate = matches!(config.validate(&network_id), Ok(true)); let pnetwork_hub = config.pnetwork_hub(&network_id)?; @@ -75,7 +77,7 @@ pub(super) async fn syncer_loop( info!("{log_prefix} batch is ready to submit!"); let args = WebSocketMessagesProcessBatchArgs::new_for_syncer( validate, - network_id, + network_config.clone(), pnetwork_hub, batch.to_submission_material(), *batch.governance_address(), @@ -93,8 +95,13 @@ pub(super) async fn syncer_loop( }; match websocket_response { Ok(WebSocketMessagesEncodable::Success(output)) => { + // FIXME Handle below result more explicitly if you don't want a crash on + // the error variant + let processor_output = ProcessorOutput::try_from(output)?; + todo!("now you have `processor_output.signed_events()` and you can do with them what you wish/put in mongo/whatever"); + debug!("{log_prefix} websocket channel returned success output: {output}"); - batch.update_bpm_from_json(output); + batch.update_bpm(&processor_output); batch.increment_block_num(); }, Ok(WebSocketMessagesEncodable::Error(WebSocketMessagesError::NoParent(e))) => { diff --git a/v3_bridges/sentinel-app/src/type_aliases.rs b/v3_bridges/sentinel-app/src/type_aliases.rs index 50a8ffe67..4810a68c0 100644 --- a/v3_bridges/sentinel-app/src/type_aliases.rs +++ b/v3_bridges/sentinel-app/src/type_aliases.rs @@ -3,7 +3,6 @@ use common_sentinel::{ ChallengeResponderMessages, EthRpcMessages, StatusPublisherMessages, - UserOpCancellerMessages, WebSocketMessages, }; use tokio::sync::{ @@ -15,8 +14,6 @@ pub(crate) type CoreCxnStatus = bool; pub(crate) type EthRpcTx = MpscTx; pub(crate) type WebSocketRx = MpscRx; pub(crate) type WebSocketTx = MpscTx; -pub(crate) type UserOpCancellerTx = MpscTx; -pub(crate) type UserOpCancellerRx = MpscRx; pub(crate) type StatusPublisherTx = MpscTx; pub(crate) type StatusPublisherRx = MpscRx; pub(crate) type BroadcastChannelTx = MpmcTx; diff --git a/v3_bridges/sentinel-app/src/user_op_canceller/mod.rs b/v3_bridges/sentinel-app/src/user_op_canceller/mod.rs deleted file mode 100644 index 004f6b03e..000000000 --- a/v3_bridges/sentinel-app/src/user_op_canceller/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod user_op_canceller_loop; - -pub(crate) use self::user_op_canceller_loop::user_op_canceller_loop; diff --git a/v3_bridges/sentinel-app/src/user_op_canceller/user_op_canceller_loop.rs b/v3_bridges/sentinel-app/src/user_op_canceller/user_op_canceller_loop.rs deleted file mode 100644 index 6281cd73e..000000000 --- a/v3_bridges/sentinel-app/src/user_op_canceller/user_op_canceller_loop.rs +++ /dev/null @@ -1,337 +0,0 @@ -use std::result::Result; - -use common_chain_ids::EthChainId; -use common_eth::EthPrivateKey; -use common_network_ids::NetworkId; -use common_sentinel::{ - call_core, - BroadcastChannelMessages, - CancellableUserOp, - CancellableUserOps, - Env, - EthRpcMessages, - EthRpcSenders, - SentinelConfig, - SentinelError, - UserOpCancellationSignature, - UserOpCancellerBroadcastChannelMessages, - UserOpCancellerMessages, - UserOpError, - WebSocketMessagesCancelUserOpArgs, - WebSocketMessagesEncodable, -}; -use ethereum_types::{H256 as EthHash, U256}; -use tokio::time::{sleep, Duration}; - -use crate::type_aliases::{ - BroadcastChannelRx, - BroadcastChannelTx, - EthRpcTx, - UserOpCancellerRx, - UserOpCancellerTx, - WebSocketTx, -}; - -async fn cancel_user_op( - cancellable_user_op: CancellableUserOp, - nonce: u64, - balance: U256, - gas_price: u64, - gas_limit: usize, - config: &SentinelConfig, - broadcasting_pk: &EthPrivateKey, - eth_rpc_tx: EthRpcTx, - websocket_tx: WebSocketTx, -) -> Result { - // NOTE: Check we can afford the tx - cancellable_user_op - .op() - .check_affordability(balance, gas_limit, gas_price)?; - - let network_id_to_cancel_on = cancellable_user_op.network_id_to_cancel_on()?; - - let pnetwork_hub = config.pnetwork_hub(&network_id_to_cancel_on)?; - - debug!("cancelling user op on enqueued network: {network_id_to_cancel_on} nonce: {nonce} gas price: {gas_price}"); - - let (msg, rx) = - EthRpcMessages::get_user_op_state_msg(network_id_to_cancel_on, cancellable_user_op.op().clone(), pnetwork_hub); - eth_rpc_tx.send(msg).await?; - let user_op_smart_contract_state = rx.await??; - debug!("user op state before cancellation: {user_op_smart_contract_state}"); - - if !user_op_smart_contract_state.is_cancellable() { - error!( - "cannot cancel user op - smart contract state of {} means it's not cancellable!", - user_op_smart_contract_state - ); - return Err(UserOpError::CannotCancel(Box::new(cancellable_user_op.op().clone())).into()); - } - - let msg = WebSocketMessagesEncodable::GetUserOpCancellationSignature(Box::new( - WebSocketMessagesCancelUserOpArgs::new(config.network_ids(), cancellable_user_op.op().clone()), - )); - - let cancellation_sig = - UserOpCancellationSignature::try_from(call_core(*config.core().timeout(), websocket_tx.clone(), msg).await?)?; - - let ecid: EthChainId = EthChainId::try_from(network_id_to_cancel_on)?; - - let signed_tx = cancellable_user_op.op().get_cancellation_tx( - nonce, - gas_price, - gas_limit, - &pnetwork_hub, - &ecid, - broadcasting_pk, - &cancellation_sig, - )?; - - debug!("signed tx: {}", signed_tx.serialize_hex()); - - let (msg, rx) = EthRpcMessages::get_push_tx_msg(signed_tx, network_id_to_cancel_on); - eth_rpc_tx.send(msg).await?; - let tx_hash = rx.await??; - - info!("tx hash: {tx_hash}"); - Ok(tx_hash) -} - -async fn get_gas_price( - config: &SentinelConfig, - network_id: &NetworkId, - eth_rpc_tx: EthRpcTx, -) -> Result { - let p = if let Ok(Some(p)) = config.gas_price(network_id) { - debug!("using {network_id} gas price from config: {p}"); - p - } else { - let (msg, rx) = EthRpcMessages::get_gas_price_msg(*network_id); - eth_rpc_tx.send(msg).await?; - let p = rx.await??; - debug!("using {network_id} gas price from rpc: {p}"); - p - }; - Ok(p) -} - -async fn cancel_user_ops( - config: &SentinelConfig, - websocket_tx: WebSocketTx, - eth_rpc_senders: EthRpcSenders, - broadcasting_pk: &EthPrivateKey, -) -> Result<(), SentinelError> { - info!("handling user op cancellation request..."); - let cancellable_user_ops = CancellableUserOps::try_from( - call_core( - *config.core().timeout(), - websocket_tx.clone(), - WebSocketMessagesEncodable::GetCancellableUserOps(config.network_ids()), - ) - .await?, - )?; - - if cancellable_user_ops.is_empty() { - warn!("no user ops to cancel"); - return Ok(()); - } - - let broadcasting_address = broadcasting_pk.to_address(); - let err_msg = "error cancelling user op "; - - for cancellable_op in cancellable_user_ops.iter() { - let network_id_to_cancel_on = cancellable_op.network_id_to_cancel_on()?; - let sender = eth_rpc_senders.sender(&network_id_to_cancel_on)?; - - let (balance_msg, balance_rx) = - EthRpcMessages::get_eth_balance_msg(network_id_to_cancel_on, broadcasting_address); - sender.send(balance_msg).await?; - let balance = balance_rx.await??; - - let (msg, rx) = EthRpcMessages::get_nonce_msg(network_id_to_cancel_on, broadcasting_address); - sender.send(msg).await?; - let nonce = rx.await??; - - let gas_price = get_gas_price(config, &network_id_to_cancel_on, sender.clone()).await?; - let gas_limit = config.gas_limit(&network_id_to_cancel_on)?; - let uid = cancellable_op.uid()?; - match cancel_user_op( - cancellable_op.clone(), - nonce, - balance, - gas_price, - gas_limit, - config, - broadcasting_pk, - sender.clone(), - websocket_tx.clone(), - ) - .await - { - Err(e) => { - error!("{err_msg} {uid} {e}"); - }, - Ok(tx_hash) => { - info!( - "user cancellable op {uid} cancelled successfully @ tx {} on {network_id_to_cancel_on}", - hex::encode(tx_hash.as_bytes()) - ); - }, - } - } - - Ok(()) -} - -async fn broadcast_channel_loop( - mut broadcast_channel_rx: BroadcastChannelRx, -) -> Result { - // NOTE: This loops continuously listening to the broadcasting channel, and only returns if we - // receive a pertinent message. This way, other messages won't cause early returns in the main - // tokios::select, so then the main_loop can continue doing its work. - 'broadcast_channel_loop: loop { - match broadcast_channel_rx.recv().await { - Ok(BroadcastChannelMessages::UserOpCanceller(msg)) => break 'broadcast_channel_loop Ok(msg), - Ok(_) => continue 'broadcast_channel_loop, // NOTE: The message wasn't for us - Err(e) => break 'broadcast_channel_loop Err(e.into()), - } - } -} - -async fn cancellation_loop(frequency: &u64, user_op_canceller_tx: UserOpCancellerTx) -> Result<(), SentinelError> { - // NOTE: This loop runs to send messages to the canceller loop at a configruable frequency to tell - // it to try and cancel any cancellable user ops. It should never return, except in error. - 'cancellation_loop: loop { - sleep(Duration::from_secs(*frequency)).await; - warn!("{frequency}s has elapsed - sending message to cancel any cancellable user ops"); - match user_op_canceller_tx.send(UserOpCancellerMessages::CancelUserOps).await { - Ok(_) => continue 'cancellation_loop, - Err(e) => break 'cancellation_loop Err(e.into()), - } - } -} - -pub async fn user_op_canceller_loop( - mut user_op_canceller_rx: UserOpCancellerRx, - eth_rpc_senders: EthRpcSenders, - config: SentinelConfig, - broadcast_channel_tx: BroadcastChannelTx, - websocket_tx: WebSocketTx, - user_op_canceller_tx: UserOpCancellerTx, - disable: bool, -) -> Result<(), SentinelError> { - let name = "user op canceller"; - - let mut frequency = 120; // FIXME make configurable! Make updatable whilst running too! - let mut is_enabled = !disable; - let mut core_is_connected = false; - - warn!("{name} not active yet due to no core connection"); - - Env::init()?; - let broadcasting_pk = Env::get_private_key()?; - - 'user_op_canceller_loop: loop { - tokio::select! { - r = cancellation_loop( - &frequency, - user_op_canceller_tx.clone(), - ), if (is_enabled && core_is_connected) => { - let sleep_time = 30; - match r { - Ok(_) => { - warn!("user op canceller cancellation loop returned Ok(()) for some reason"); - }, - Err(e) => { - error!("user op canceller cancellation loop error: {e}"); - } - } - warn!("sleeping for {sleep_time}s and restarting broadcaster loop"); - sleep(Duration::from_secs(sleep_time)).await; - continue 'user_op_canceller_loop - }, - r = broadcast_channel_loop(broadcast_channel_tx.subscribe()) => { - match r { - Ok(msg) => { - let note = format!("(core is currently {}connected)", if core_is_connected { "" } else { "not "}); - match msg { - UserOpCancellerBroadcastChannelMessages::Stop => { - warn!("msg received to stop the {name} {note}"); - is_enabled = false; - continue 'user_op_canceller_loop - }, - UserOpCancellerBroadcastChannelMessages::Start => { - warn!("msg received to start the {name} {note}"); - is_enabled = true; - continue 'user_op_canceller_loop - }, - UserOpCancellerBroadcastChannelMessages::CoreConnected => { - warn!("core connected message received in {name} {note}"); - core_is_connected = true; - continue 'user_op_canceller_loop - }, - UserOpCancellerBroadcastChannelMessages::CoreDisconnected => { - warn!("core disconnected message received in {name} {note}"); - core_is_connected = false; - continue 'user_op_canceller_loop - }, - } - }, - Err(e) => break 'user_op_canceller_loop Err(e), - } - }, - r = user_op_canceller_rx.recv() , if is_enabled && core_is_connected => match r { - Some(UserOpCancellerMessages::SetFrequency(new_frequency)) => { - frequency = new_frequency; - info!("updated user op canceller frequency to {new_frequency}"); - continue 'user_op_canceller_loop - }, - Some(UserOpCancellerMessages::CancelUserOps) => { - match cancel_user_ops( - &config, - websocket_tx.clone(), - eth_rpc_senders.clone(), - &broadcasting_pk, - ).await { - Ok(_) => { - info!("finished handling user op cancellation request"); - } - Err(SentinelError::UserOp(boxed_user_op_error)) => match *boxed_user_op_error { - UserOpError::InsufficientBalance { have, need } => { - error!("!!! WARNING !!!"); - error!("!!! WARNING !!!"); - error!("!!! WARNING !!!"); - warn!(">>> insufficient balance to cancel a user op - have: {have}, need: {need} <<<"); - error!("!!! WARNING !!!"); - error!("!!! WARNING !!!"); - error!("!!! WARNING !!!"); - continue 'user_op_canceller_loop - }, - e => { - error!("unhandled user op error: {e}"); - break 'user_op_canceller_loop Err(e.into()) - } - }, - Err(e) => { - error!("{e}"); - } - }; - continue 'user_op_canceller_loop - }, - None => { - let m = "all {name} senders dropped!"; - warn!("{m}"); - break 'user_op_canceller_loop Err(SentinelError::Custom(name.into())) - }, - }, - _ = tokio::signal::ctrl_c() => { - warn!("{name} shutting down..."); - break 'user_op_canceller_loop Err(SentinelError::SigInt(name.into())) - }, - else => { - warn!("in {name} `else` branch, {name} is currently {}abled", if is_enabled { "en" } else { "dis" }); - continue 'user_op_canceller_loop - }, - } - } -} diff --git a/v3_bridges/sentinel-strongbox/src/android/handlers/process_batch.rs b/v3_bridges/sentinel-strongbox/src/android/handlers/process_batch.rs index c3f426eb7..a087c245e 100644 --- a/v3_bridges/sentinel-strongbox/src/android/handlers/process_batch.rs +++ b/v3_bridges/sentinel-strongbox/src/android/handlers/process_batch.rs @@ -11,7 +11,7 @@ use serde_json::json; use crate::android::State; pub fn process_batch(args: WebSocketMessagesProcessBatchArgs, state: State) -> Result { - let network_id = args.network_id(); + let network_config = args.network_config(); let sentinel_address = ChainDbUtils::new(state.db()).get_signing_address()?; let result = process_batch_of_blocks( @@ -19,7 +19,7 @@ pub fn process_batch(args: WebSocketMessagesProcessBatchArgs, state: State) -> R args.pnetwork_hub(), args.sub_mat_batch(), *args.validate(), - network_id, + network_config, *args.reprocess(), *args.dry_run(), *args.governance_address(), diff --git a/v3_bridges/sentinel-strongbox/src/android/state.rs b/v3_bridges/sentinel-strongbox/src/android/state.rs index 1fdb6818b..29ce4e09d 100644 --- a/v3_bridges/sentinel-strongbox/src/android/state.rs +++ b/v3_bridges/sentinel-strongbox/src/android/state.rs @@ -44,6 +44,7 @@ impl<'a> State<'a> { let input_string: String = env.get_string(input)?.into(); let msg = WebSocketMessagesEncodable::try_from(input_string)?; let strongbox = Strongbox::new(env, strongbox_java_class); + strongbox.initialize_keystore(); Ok(State { env, msg, diff --git a/v3_bridges/sentinel-strongbox/src/android/strongbox.rs b/v3_bridges/sentinel-strongbox/src/android/strongbox.rs index 685772ee9..0604b5212 100644 --- a/v3_bridges/sentinel-strongbox/src/android/strongbox.rs +++ b/v3_bridges/sentinel-strongbox/src/android/strongbox.rs @@ -88,7 +88,7 @@ impl<'a> Strongbox<'a> { } } - fn initialize_keystore(&self) -> Result<(), SentinelError> { + pub fn initialize_keystore(&self) -> Result<(), SentinelError> { if matches!(self.check_keystore_is_initialized(), Ok(true)) { debug!("keystore already initialized!"); Ok(())