diff --git a/event_feed/Cargo.toml b/event_feed/Cargo.toml index a8dc8de..f358aa1 100644 --- a/event_feed/Cargo.toml +++ b/event_feed/Cargo.toml @@ -20,7 +20,10 @@ dirs = '5.0.1' envy = '0.4' anyhow = '1.0.82' icon-sdk = '1.2.0' -tendermint-rpc = {version = "0.35.0", features = ["http-client", "websocket-client"]} +tendermint-rpc = { version = "0.35.0", features = [ + "http-client", + "websocket-client", +] } futures = "0.3.30" base64 = "0.22.0" clap ={ version = "4.5.4", features = [ "derive", "env" ]} @@ -28,10 +31,7 @@ clap ={ version = "4.5.4", features = [ "derive", "env" ]} [dependencies.ethers] version = '2.0.8' default_features = false -features = [ - 'ws', - 'rustls', -] +features = ['ws', 'rustls'] [dependencies.serde] version = '1.0.198' @@ -42,8 +42,4 @@ path = '../runtime/lite' [dependencies.tokio] version = '1.36.0' -features = [ - 'macros', - 'time', - 'rt-multi-thread', -] +features = ['macros', 'time', 'rt-multi-thread'] diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index 2df96d4..d7d85ae 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -1,6 +1,8 @@ use super::*; use crate::common; use anyhow::*; +use futures::StreamExt; +use runtime::{logger::CoreLogger, Logger}; use serde_json::Value; use tendermint_rpc::event::{Event, EventData}; @@ -8,6 +10,7 @@ use tendermint_rpc::event::{Event, EventData}; pub struct CosmosFeed { pub chain_config: ChainConfig, pub events: Vec, + pub logger: CoreLogger, } impl CosmosFeed { @@ -16,7 +19,7 @@ impl CosmosFeed { /// Arguments: /// /// * `chain_config`: An event feed chain configuration. - pub fn new(chain_config: common::ChainConfig) -> CosmosFeed { + pub fn new(chain_config: common::ChainConfig, logger: CoreLogger) -> CosmosFeed { let events = chain_config .event_filter .split(',') @@ -26,6 +29,7 @@ impl CosmosFeed { CosmosFeed { chain_config, events, + logger, } } @@ -38,6 +42,10 @@ impl CosmosFeed { let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url) .await .unwrap(); + self.logger.info(&format!( + "Following the chain at {}", + self.chain_config.node_url + )); let driver_handle = tokio::spawn(async move { driver.run().await }); let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap(); @@ -50,10 +58,12 @@ impl CosmosFeed { let filter_events = events .iter() .filter(|tendermint_event| { - Self::convert_to_feeder_event(tendermint_event, &self.chain_config).is_some() + Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config) + .is_some() }) .flat_map(|tendermint_event| { - Self::convert_to_feeder_event(tendermint_event, &self.chain_config).unwrap() + Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config) + .unwrap() }) .collect::>(); @@ -64,6 +74,7 @@ impl CosmosFeed { } drop(subs); + self.logger.info("Websocket connection closed!!"); let _ = driver_handle.await; Ok(()) } @@ -79,9 +90,12 @@ impl CosmosFeed { /// /// Returns the list of `FeederEvent` objects as json values. fn convert_to_feeder_event( + &self, event: &Event, chain_config: &ChainConfig, ) -> Option> { + self.logger + .debug(&format!("Processing events : {:?}", event)); match event.data.clone() { EventData::LegacyNewBlock { ref block, @@ -91,8 +105,16 @@ impl CosmosFeed { let block = block.as_ref().unwrap(); let block_number = block.header.version.block as usize; let hash_string = block.header.last_commit_hash.map(|h| h.to_string()); + self.logger.info(&format!( + "Processing LegacyNewBlockEvent for block: {}", + block.header.version.block + )); let filtered_events: Vec = if chain_config.event_filter.is_empty() { + self.logger.info(&format!( + "Processing all events from block : {}", + block_number + )); result_begin_block .unwrap() .events @@ -108,6 +130,7 @@ impl CosmosFeed { .map(|e| serde_json::to_value(e).unwrap()) .collect() } else { + self.logger.info("Filtering events based on the event name"); result_begin_block .unwrap() .events @@ -128,8 +151,10 @@ impl CosmosFeed { }; if !filtered_events.is_empty() { + self.logger.info("returning the filtered events"); Some(filtered_events) } else { + self.logger.info("No events matched the filter"); None } } diff --git a/event_feed/src/cosmos/tests.rs b/event_feed/src/cosmos/tests.rs index b4ed721..26a4531 100644 --- a/event_feed/src/cosmos/tests.rs +++ b/event_feed/src/cosmos/tests.rs @@ -1,6 +1,6 @@ mod tests { - use super::super::*; + use runtime::{logger::CoreLogger, Logger}; #[tokio::test] async fn test_cosmos_feed_new() { @@ -10,8 +10,9 @@ mod tests { contract_filter: "".to_string(), chain: "cosmos".to_string(), }; - let cosmos_feed = CosmosFeed::new(chain_config.clone()); + let logger = CoreLogger::new(Some("./event-feed.log")); + let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger); assert_eq!(cosmos_feed.chain_config, chain_config); assert_eq!(cosmos_feed.events.len(), 1); assert!(cosmos_feed.events.contains(&"transfer".to_string())); @@ -29,9 +30,10 @@ mod tests { let callback = |events: Vec| { assert!(events.is_empty()); }; - - let cosmos_feed = CosmosFeed::new(chain_config.clone()); + let logger = CoreLogger::new(Some("./event-feed.log")); + let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger); let result = cosmos_feed.event_feed(&callback).await; + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(result.is_ok()); } @@ -51,8 +53,10 @@ mod tests { // Simulate an error condition assert!(events.is_empty()); }; - let cosmos_feed = CosmosFeed::new(chain_config.clone()); + let logger = CoreLogger::new(Some("./event-feed.log")); + let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger); let _ = cosmos_feed.event_feed(&callback).await; + let _ = tokio::fs::remove_file("./event-feed.log").await; } } diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs index f8ac412..30ede70 100644 --- a/event_feed/src/eth/feeder.rs +++ b/event_feed/src/eth/feeder.rs @@ -1,18 +1,20 @@ use super::*; +use runtime::{logger::CoreLogger, Logger}; /// Represents an Ethereum blockchain event feed. pub struct EthFeed { eth_service: Provider, events: Vec<(String, H256)>, contracts: Vec, + logger: CoreLogger, } impl EthFeed { /// Creates a new EthFeed instance based on the provided chain configuration. - /// + /// /// Arguments: - /// + /// /// * `config`: An event feed chain configuration. - pub async fn new(config: ChainConfig) -> Result { + pub async fn new(config: ChainConfig, logger: CoreLogger) -> Result { let events = config .event_filter .split(';') @@ -33,15 +35,16 @@ impl EthFeed { eth_service: client, events, contracts, + logger, }; Ok(eth_feed) } /// Fetches the events from the Ethereum chain and returns it through the provided callback function. - /// + /// /// # Arguments: - /// + /// /// * `cb`: A callback function to return the events to the caller. pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let client = Arc::new(&self.eth_service); @@ -58,6 +61,10 @@ impl EthFeed { let filter = Filter::new().from_block(last_block - 25).events(events); let mut stream = client.subscribe_logs(&filter).await?; + self.logger.info(&format!( + "Subscribed to events with the filter : {:?}", + filter + )); while let Some(log) = stream.next().await { if self.contracts.is_empty() || self.contracts.contains(&format!("{:?}", &log.address)) @@ -83,6 +90,10 @@ impl EthFeed { let tx_receipt = client.get_transaction_receipt(tx_hash).await; if let Ok(Some(tx_receipt)) = tx_receipt { + self.logger.info(&format!( + "Received transaction receipt for the tx_hash : {:?}", + tx_hash + )); let mut logs = Vec::::new(); for log in tx_receipt.logs.iter() { diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index d57f9ca..2a5f346 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -1,10 +1,12 @@ use super::*; +use runtime::{logger::CoreLogger, Logger}; /// Represents the icon blockchain event feed which contains the endpoint and the filters. pub struct IconFeed { icon_service: IconService, events: Vec, score: Vec, + logger: CoreLogger, } impl IconFeed { @@ -17,7 +19,7 @@ impl IconFeed { /// Returns: /// /// The `new` function is returning a `Result` containing an `IconFeed` struct. - pub fn new(config: ChainConfig) -> Result { + pub fn new(config: ChainConfig, logger: CoreLogger) -> Result { let events = config .event_filter .split(',') @@ -38,6 +40,7 @@ impl IconFeed { icon_service, events, score, + logger, }; Ok(icon_feed) @@ -59,6 +62,10 @@ impl IconFeed { if !self.events.is_empty() || !self.score.is_empty() { let tx_hash: String = serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap(); + self.logger.info(&format!( + "Filtering the events with the tx_hash : {:?}", + tx_hash + )); let event_logs = get_event_logs_by_tx_hash(&self.icon_service, &tx_hash).await?; @@ -66,11 +73,15 @@ impl IconFeed { if !&self.score.is_empty() { for filter_score in &self.score { if filter_score == &event_log.score_address { + self.logger + .info(&format!("Matched the score filter : {:?}", filter_score)); score_filter = true; break; } } } else { + self.logger + .info("No score filter found, allowing all the transactions"); score_filter = true; } @@ -83,6 +94,8 @@ impl IconFeed { } } else { events_filter = true; + self.logger + .info("No event filter found, allowing all the transactions"); } if events_filter && score_filter { @@ -94,8 +107,10 @@ impl IconFeed { events_filter = true; score_filter = true; } - - Ok(events_filter & score_filter) + let result = events_filter && score_filter; + self.logger + .info(&format!("Filtering the result : {:?}", result)); + Ok(result) } /// Fetches the events from the Icon chain and returns it through the provided callback function. @@ -106,7 +121,10 @@ impl IconFeed { pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let mut latest_height = get_icon_block_height(&self.icon_service).await?; let mut old_height = latest_height - 1; - + self.logger.info(&format!( + "Event feed started at {:?}, {:?}", + latest_height, old_height + )); loop { if old_height < latest_height { let block = match self @@ -137,12 +155,19 @@ impl IconFeed { } } + self.logger.info(&format!( + "Filtered the {:?} transactions", + filtered_tx.len() + )); + if !filtered_tx.is_empty() { cb(filtered_tx) } old_height += 1; } else { + self.logger + .info("No new blocks got detected, sleeping for 1 second"); sleep(Duration::from_secs(1)); } diff --git a/event_feed/src/icon/tests.rs b/event_feed/src/icon/tests.rs index 72644b7..cacec8f 100644 --- a/event_feed/src/icon/tests.rs +++ b/event_feed/src/icon/tests.rs @@ -1,15 +1,17 @@ mod tests { use super::super::*; + use runtime::{logger::CoreLogger, Logger}; #[tokio::test] async fn test_filter_events_true() { let config = ChainConfig { - chain:"Icon".to_string(), + chain: "Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "ICXTransfer".to_string(), contract_filter: "".to_string(), }; - let icon_feed = IconFeed::new(config).unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let icon_feed = IconFeed::new(config, logger).unwrap(); let icon_service = IconService::new(None); @@ -24,6 +26,7 @@ mod tests { .clone(), ) .unwrap(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(icon_feed.filter(&tx_list[1]).await.unwrap()); } @@ -31,12 +34,13 @@ mod tests { #[tokio::test] async fn test_filter_events_false() { let config = ChainConfig { - chain:"Icon".to_string(), + chain: "Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "ICXIssued".to_string(), contract_filter: "".to_string(), }; - let icon_feed = IconFeed::new(config).unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let icon_feed = IconFeed::new(config, logger).unwrap(); let icon_service = IconService::new(None); @@ -47,6 +51,7 @@ mod tests { ) .await .unwrap(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(!icon_feed.filter(&tx.get("result").unwrap()).await.unwrap()); } @@ -54,12 +59,13 @@ mod tests { #[tokio::test] async fn test_filter_score_true() { let config = ChainConfig { - chain:"Icon".to_string(), + chain: "Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "".to_string(), contract_filter: "cx21e94c08c03daee80c25d8ee3ea22a20786ec231".to_string(), }; - let icon_feed = IconFeed::new(config).unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let icon_feed = IconFeed::new(config, logger).unwrap(); let icon_service = IconService::new(None); @@ -69,6 +75,7 @@ mod tests { ) .await .unwrap(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(icon_feed.filter(&tx.get("result").unwrap()).await.unwrap()); } @@ -76,12 +83,13 @@ mod tests { #[tokio::test] async fn test_filter_score_false() { let config = ChainConfig { - chain:"Icon".to_string(), + chain: "Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "".to_string(), contract_filter: "cx21e94c08c03daee80c25d8ee3ea22a20786ec231".to_string(), }; - let icon_feed = IconFeed::new(config).unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let icon_feed = IconFeed::new(config, logger).unwrap(); let icon_service = IconService::new(None); @@ -91,6 +99,7 @@ mod tests { ) .await .unwrap(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(!icon_feed.filter(&tx.get("result").unwrap()).await.unwrap()); } @@ -98,12 +107,13 @@ mod tests { #[tokio::test] async fn test_filter_event_and_score_true() { let config = ChainConfig { - chain:"Icon".to_string(), + chain: "Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "Transfer".to_string(), contract_filter: "cx88fd7df7ddff82f7cc735c871dc519838cb235bb".to_string(), }; - let icon_feed = IconFeed::new(config).unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let icon_feed = IconFeed::new(config, logger).unwrap(); let icon_service = IconService::new(None); @@ -113,6 +123,7 @@ mod tests { ) .await .unwrap(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(icon_feed.filter(&tx.get("result").unwrap()).await.unwrap()); } @@ -120,12 +131,13 @@ mod tests { #[tokio::test] async fn test_filter_event_and_score_false() { let config = ChainConfig { - chain:"Icon".to_string(), + chain: "Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "ICXIssued".to_string(), contract_filter: "cx88fd7df7ddff82f7cc735c871dc519838cb235bb".to_string(), }; - let icon_feed = IconFeed::new(config).unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let icon_feed = IconFeed::new(config, logger).unwrap(); let icon_service = IconService::new(None); @@ -135,6 +147,7 @@ mod tests { ) .await .unwrap(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(!icon_feed.filter(&tx.get("result").unwrap()).await.unwrap()); } diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index f5a3016..c357e34 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -3,7 +3,7 @@ use event_feed::{ }; use std::{fs, path}; use subxt::PolkadotConfig; - +use runtime::{logger::CoreLogger, Logger}; use clap::{Parser, Subcommand}; #[derive(Parser, Debug, Clone)] @@ -29,6 +29,7 @@ pub enum Commands { #[tokio::main] async fn main() -> Result<(), Box> { + let logger = CoreLogger::new(Some("./event-feed.log")); let args = EventFeed::parse(); //TODO config none case @@ -43,25 +44,26 @@ async fn main() -> Result<(), Box> { None => (ChainConfig::default(), ProducerConfig::default()), }; let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); + logger.info("SSB client created"); ssb_client.accept_invite().await.unwrap(); let feed = match args.command { Commands::Substrate => { - let polkadot_client = PolkadotFeed::::new(chain_config.clone()) + let polkadot_client = PolkadotFeed::::new(chain_config.clone(), logger.clone()) .await .unwrap(); Context::PolkadotFeed(polkadot_client) } Commands::Icon => { - let icon_client = IconFeed::new(chain_config.clone()).unwrap(); + let icon_client = IconFeed::new(chain_config.clone(), logger.clone()).unwrap(); Context::IconFeed(icon_client) } Commands::Eth => { - let eth_client = EthFeed::new(chain_config.clone()).await.unwrap(); + let eth_client = EthFeed::new(chain_config.clone(), logger.clone()).await.unwrap(); Context::EthFeed(eth_client) } Commands::Cosmos => { - let cosmos_client = CosmosFeed::new(chain_config.clone()); + let cosmos_client = CosmosFeed::new(chain_config.clone(), logger.clone()); Context::CosmosFeed(cosmos_client) } }; diff --git a/event_feed/src/substrate/feeder.rs b/event_feed/src/substrate/feeder.rs index 4a0be75..9d4d0af 100644 --- a/event_feed/src/substrate/feeder.rs +++ b/event_feed/src/substrate/feeder.rs @@ -1,12 +1,13 @@ use super::*; use crate::common::ChainConfig; +use runtime::{logger::CoreLogger, Logger}; #[derive(Debug, Clone)] /// The `PolkadotFeed` struct in Rust contains a `chain_config` field of type `ChainConfig` and a /// `client` field of type `OnlineClient`. -/// +/// /// Properties: -/// +/// /// * `chain_config`: The `chain_config` property is a field of type `ChainConfig` in the `PolkadotFeed` /// struct. It likely contains configuration settings related to the Polkadot chain that the feed is /// interacting with. @@ -18,33 +19,35 @@ pub struct PolkadotFeed { // Substrate client client: OnlineClient, + logger: CoreLogger, } impl PolkadotFeed { - /// Creates a new `PolkadotFeed` instance based on the provided chain configuration. - /// + /// /// # Arguments - /// + /// /// * `chain_config`: An event feed chain configuration. - pub async fn new(chain_config: ChainConfig) -> Result> { - let client = OnlineClient::::from_url(&chain_config.node_url) - .await - ?; + pub async fn new( + chain_config: ChainConfig, + logger: CoreLogger, + ) -> Result> { + let client = OnlineClient::::from_url(&chain_config.node_url).await?; Ok(PolkadotFeed { chain_config, client, + logger, }) } } impl PolkadotFeed { /// Fetches the events from the Substrate chain and returns it through the provided callback function. - /// + /// /// # Arguments - /// + /// /// * `cb`: A callback function to return the events to the caller. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()>{ + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let mut blocks_sub = self.client.blocks().subscribe_finalized().await?; // For each block, print a bunch of information about it: @@ -97,14 +100,15 @@ impl PolkadotFeed { } /// Splits a string by ';' and then by '=' to create a HasMap. This hashmap is used to filter the events. - /// + /// /// Returns: - /// + /// /// Returns the hashmap with the key as the pallet name and the value as the list of events. pub fn split_filter(&self) -> std::collections::HashMap> { let mut filter_map: std::collections::HashMap> = std::collections::HashMap::new(); if !self.chain_config.event_filter.is_empty() { + self.logger.info("No event filters configured"); for pair in self.chain_config.event_filter.split(';') { if !pair.contains('=') { filter_map.insert(pair.to_string(), vec![]); diff --git a/event_feed/src/substrate/tests.rs b/event_feed/src/substrate/tests.rs index bf3ddac..f25c29e 100644 --- a/event_feed/src/substrate/tests.rs +++ b/event_feed/src/substrate/tests.rs @@ -1,3 +1,4 @@ +use runtime::{logger::CoreLogger, Logger}; use tests::feeder::PolkadotFeed; use super::*; @@ -11,8 +12,12 @@ async fn test_filter_events() { event_filter: "balances=Transfer".to_string(), contract_filter: "".to_string(), }; - let polkadot = PolkadotFeed::::new(config).await.unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let polkadot = PolkadotFeed::::new(config, logger) + .await + .unwrap(); let result = polkadot.split_filter(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert_eq!( result, @@ -28,8 +33,12 @@ async fn test_filter_events_argument_with_no_method() { event_filter: "balances=Transfer;system".to_string(), contract_filter: "".to_string(), }; - let polkadot = PolkadotFeed::::new(config).await.unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let polkadot = PolkadotFeed::::new(config, logger) + .await + .unwrap(); let result = polkadot.split_filter(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert_eq!( result,