From b4b500c693653313a9843253a4373997eb6ecaf9 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Wed, 1 May 2024 17:00:20 +0530 Subject: [PATCH 01/12] chore: add logger for the event feed --- event_feed/src/cosmos/feeder.rs | 20 ++++++++++++++++++++ event_feed/src/eth/feeder.rs | 12 ++++++++++++ event_feed/src/icon/feeder.rs | 29 ++++++++++++++++++++++++++--- event_feed/src/icon/utils.rs | 6 ++++++ event_feed/src/main.rs | 9 ++++++++- event_feed/src/substrate/types.rs | 12 ++++++++---- 6 files changed, 80 insertions(+), 8 deletions(-) diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index b1cc478..ac844b3 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -1,6 +1,7 @@ use super::*; use crate::{common, cosmos}; use anyhow::*; +use runtime::{logger::CoreLogger, Logger}; use serde_json::Value; use tendermint_rpc::event::{Event, EventData}; @@ -62,9 +63,14 @@ impl CosmosFeed { /// /// The `event_feed` function returns a `Result<()>`. pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { + let logger = CoreLogger::new(Some("./cosmos-event-feed.log")); let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url) .await .unwrap(); + logger.info(&format!( + "Following the publisher {}", + self.chain_config.node_url + )); let driver_handle = tokio::spawn(async move { driver.run().await }); let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap(); @@ -91,6 +97,7 @@ impl CosmosFeed { } drop(subs); + logger.info("Websocket connection closed!!"); let _ = driver_handle.await; Ok(()) } @@ -115,6 +122,8 @@ impl CosmosFeed { event: &Event, chain_config: &ChainConfig, ) -> Option> { + let logger = CoreLogger::new(Some("./event-feed.log")); + logger.debug(&format!("Processing events : {:?}", event)); match event.data.clone() { EventData::LegacyNewBlock { ref block, @@ -124,8 +133,16 @@ impl CosmosFeed { let block = block.as_ref().unwrap(); let block_number = block.header.version.block as usize; let hash_string = block.header.last_commit_hash.map(|h| h.to_string()); + logger.info(&format!( + "Processing LegacyNewBlockEvent for block: {}", + block.header.version.block + )); let filtered_events: Vec = if chain_config.event_filter.is_empty() { + logger.info(&format!( + "Processing all events from block : {}", + block_number + )); result_begin_block .unwrap() .events @@ -141,6 +158,7 @@ impl CosmosFeed { .map(|e| serde_json::to_value(e).unwrap()) .collect() } else { + logger.info("Filtering events based on the event name"); result_begin_block .unwrap() .events @@ -161,8 +179,10 @@ impl CosmosFeed { }; if !filtered_events.is_empty() { + logger.info("returning the filtered events"); Some(filtered_events) } else { + logger.info("No events matched the filter"); None } } diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs index a1b3c31..4e08c3c 100644 --- a/event_feed/src/eth/feeder.rs +++ b/event_feed/src/eth/feeder.rs @@ -1,4 +1,6 @@ use super::*; +use runtime::{logger::CoreLogger, Logger}; + pub struct EthFeed { eth_service: Provider, events: Vec<(String, H256)>, @@ -33,6 +35,7 @@ impl EthFeed { } pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { + let logger = CoreLogger::new(Some("./eth-event-feed.log")); let client = Arc::new(&self.eth_service); let last_block = client @@ -47,6 +50,10 @@ impl EthFeed { let filter = Filter::new().from_block(last_block - 25).events(events); let mut stream = client.subscribe_logs(&filter).await?; + logger.info(&format!( + "Subscribed to events with the filter : {:?}", + filter + )); while let Some(log) = stream.next().await { if self.contracts.is_empty() || self.contracts.contains(&format!("{:?}", &log.address)) @@ -72,6 +79,10 @@ impl EthFeed { let tx_receipt = client.get_transaction_receipt(tx_hash).await; if let Ok(Some(tx_receipt)) = tx_receipt { + logger.info(&format!( + "Received trabsaction receipt for the tx_hash : {:?}", + tx_hash + )); let mut logs = Vec::::new(); for log in tx_receipt.logs.iter() { @@ -80,6 +91,7 @@ impl EthFeed { { for evt in self.events.iter() { if log.topics[0] == evt.1 { + logger.info(&format!("Matched event : {:?}", evt.0.clone())); logs.push( serde_json::to_value( FeederEvent { diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index ae19174..5573f1c 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -1,4 +1,7 @@ +use crate::substrate::polkadot::runtime_apis::core::Core; + use super::*; +use runtime::{logger::CoreLogger, Logger}; pub struct IconFeed { icon_service: IconService, @@ -34,12 +37,18 @@ impl IconFeed { } pub async fn filter(&self, transaction: &Value) -> Result { + let logger = CoreLogger::new(Some("./icon-event-feed.log")); let mut events_filter = false; let mut score_filter = false; if !self.events.is_empty() || !self.score.is_empty() { + logger.info("Checking the evnt filters or score filters"); let tx_hash: String = serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap(); + logger.info(&format!( + "Filtering the events with the tx_hash : {:?}", + tx_hash + )); let event_logs = get_event_logs_by_tx_hash(&self.icon_service, &tx_hash).await?; @@ -47,11 +56,13 @@ impl IconFeed { if !&self.score.is_empty() { for filter_score in &self.score { if filter_score == &event_log.score_address { + logger.info(&format!("Matched the score filter : {:?}", filter_score)); score_filter = true; break; } } } else { + logger.info("No score filter found, allowing all the transactions"); score_filter = true; } @@ -64,6 +75,7 @@ impl IconFeed { } } else { events_filter = true; + logger.info("No event filter found, allowing all the transactions"); } if events_filter && score_filter { @@ -75,14 +87,19 @@ impl IconFeed { events_filter = true; score_filter = true; } - - Ok(events_filter & score_filter) + let result = events_filter && score_filter; + logger.info(&format!("Filtering the result : {:?}", result)); + Ok(result) } pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { + let logger = CoreLogger::new(Some("./icon-event-feed.log")); let mut latest_height = get_icon_block_height(&self.icon_service).await?; let mut old_height = latest_height - 1; - + logger.info(&format!( + "Event feed started at {:?}, {:?}", + latest_height, old_height + )); loop { if old_height < latest_height { let block = match self @@ -113,12 +130,18 @@ impl IconFeed { } } + logger.info(&format!( + "Filtered the {:?} transactions", + filtered_tx.len() + )); + if !filtered_tx.is_empty() { cb(filtered_tx) } old_height += 1; } else { + logger.info("No new blocks got detected, sleeping for 1 second"); sleep(Duration::from_secs(1)); } diff --git a/event_feed/src/icon/utils.rs b/event_feed/src/icon/utils.rs index a680580..10b7745 100644 --- a/event_feed/src/icon/utils.rs +++ b/event_feed/src/icon/utils.rs @@ -1,6 +1,8 @@ use super::*; +use runtime::{logger::CoreLogger, Logger}; pub async fn get_icon_block_height(icon_service: &IconService) -> Result { + let logger = CoreLogger::new(Some("./icon-event-feed.log")); let latest_block = icon_service .get_last_block() .await @@ -9,6 +11,7 @@ pub async fn get_icon_block_height(icon_service: &IconService) -> Result .and_then(|value| value.get("height")) .unwrap() .clone(); + logger.info(&format!("Got the latest block height {:?}", latest_block)); Ok(serde_json::from_value(latest_block)?) } @@ -17,6 +20,7 @@ pub async fn get_event_logs_by_tx_hash( icon_service: &IconService, tx_hash: &str, ) -> Result> { + let logger = CoreLogger::new(Some("./icon-event-feed.log")); let mut tx_result = icon_service.get_transaction_result(tx_hash).await; while let Err(error) = tx_result { @@ -29,6 +33,7 @@ pub async fn get_event_logs_by_tx_hash( )); } + logger.info(&format!("Retrying the transaction result fetch for tx_hash: {}", tx_hash)); sleep(Duration::from_millis(1000)); tx_result = icon_service.get_transaction_result(tx_hash).await } @@ -42,6 +47,7 @@ pub async fn get_event_logs_by_tx_hash( .clone(), ) .unwrap(); + logger.info(&format!("Fetched {} event logs for the tx_hash {}", event_logs.len(), tx_hash)); Ok(event_logs) } diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index dab8243..2921ddb 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -4,6 +4,7 @@ use common::cosmos::CosmosFeed; use common::{ChainConfig, ProducerConfig}; use eth::EthFeed; use icon::IconFeed; +use runtime::{logger::CoreLogger, Logger}; use substrate::types::PolkadotFeed; use subxt::PolkadotConfig; @@ -12,15 +13,16 @@ mod cosmos; mod eth; mod icon; mod substrate; -// pub use cosmos; #[tokio::main] async fn main() { + let logger = CoreLogger::new(Some("./event-feed.log")); dotenv::dotenv().ok(); let chain_config = envy::from_env::().unwrap(); let producer_config = envy::from_env::().unwrap(); let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); + logger.info("SSB client created"); ssb_client.accept_invite().await.unwrap(); let feed: Context = match chain_config.clone().chain.to_lowercase().as_str() { @@ -28,18 +30,22 @@ async fn main() { let polkadot_client = PolkadotFeed::::new(chain_config.clone()) .await .unwrap(); + logger.info("Polkadot client got created"); Context::PolkadotFeed(polkadot_client) } "icon" => { let icon_client = IconFeed::new(chain_config.clone()).unwrap(); + logger.info("Icon client got created"); Context::IconFeed(icon_client) } "eth" => { let eth_client = EthFeed::new(chain_config.clone()).await.unwrap(); + logger.info("Eth client got created"); Context::EthFeed(eth_client) } "cosmos" => { let cosmos_client = CosmosFeed::new(chain_config.clone()); + logger.info("Cosmos client got created"); Context::CosmosFeed(cosmos_client) } _ => panic!("Invalid Chain"), @@ -48,6 +54,7 @@ async fn main() { let _ = feed .feed_events(&|e| { for i in e { + logger.info(&format!("Process the events : {:?}", i)); tokio::spawn(async move { let producer_config = envy::from_env::().unwrap(); let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); diff --git a/event_feed/src/substrate/types.rs b/event_feed/src/substrate/types.rs index dad7803..3d0079c 100644 --- a/event_feed/src/substrate/types.rs +++ b/event_feed/src/substrate/types.rs @@ -1,5 +1,6 @@ use super::*; use crate::common::ChainConfig; +use runtime::{logger::CoreLogger, Logger}; #[derive(Debug, Clone)] pub struct PolkadotFeed { @@ -11,9 +12,7 @@ pub struct PolkadotFeed { impl PolkadotFeed { pub async fn new(chain_config: ChainConfig) -> Result> { - let client = OnlineClient::::from_url(&chain_config.node_url) - .await - ?; + let client = OnlineClient::::from_url(&chain_config.node_url).await?; Ok(PolkadotFeed { chain_config, client, @@ -26,13 +25,16 @@ impl PolkadotFeed { /// asynchronous function that takes a callback function `cb` as a parameter. This function /// subscribes to finalized blocks on the Polkadot chain and processes the extrinsics and events /// within those blocks. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()>{ + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { + let logger = CoreLogger::new(Some("./substrate-event-feed.log")); + logger.info("PolkadotEventFeed: event started"); let mut blocks_sub = self.client.blocks().subscribe_finalized().await?; // For each block, print a bunch of information about it: loop { if let Some(block) = blocks_sub.next().await { let block = block.unwrap(); + logger.info("Processing the obtained block number"); let mut fetched_events = Vec::new(); let extrinsics = block.extrinsics().await.unwrap(); for ext in extrinsics.iter() { @@ -79,9 +81,11 @@ impl PolkadotFeed { } pub fn split_filter(&self) -> std::collections::HashMap> { + let logger = CoreLogger::new(Some("./substrate-event-feed.log")); let mut filter_map: std::collections::HashMap> = std::collections::HashMap::new(); if !self.chain_config.event_filter.is_empty() { + logger.info("No event filters configured"); for pair in self.chain_config.event_filter.split(';') { if !pair.contains("=") { filter_map.insert(pair.to_string(), vec![]); From 43b3d94f3be7ef4096b7c9a578dc1b3518e0f506 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Thu, 2 May 2024 11:25:03 +0530 Subject: [PATCH 02/12] chore: logger for eventfeed --- event_feed/src/cosmos/feeder.rs | 29 +++++++++++++++++------------ event_feed/src/eth/feeder.rs | 11 +++++++---- event_feed/src/icon/feeder.rs | 29 ++++++++++++++++++----------- event_feed/src/icon/utils.rs | 4 ++-- event_feed/src/substrate/types.rs | 13 ++++++++----- 5 files changed, 52 insertions(+), 34 deletions(-) diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index ac844b3..d1866fa 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -20,6 +20,7 @@ use tendermint_rpc::event::{Event, EventData}; pub struct CosmosFeed { pub chain_config: ChainConfig, pub events: Vec, + pub logger: CoreLogger, } /// The above Rust code defines an implementation for a `CosmosFeed` struct. Here is a breakdown of what @@ -43,9 +44,11 @@ impl CosmosFeed { .map(|e| e.to_string()) .filter(|e| !e.is_empty()) .collect::>(); + let logger = CoreLogger::new(Some("./event-feed.log")); let cosmos = CosmosFeed { chain_config, events, + logger, }; cosmos } @@ -63,11 +66,10 @@ impl CosmosFeed { /// /// The `event_feed` function returns a `Result<()>`. pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { - let logger = CoreLogger::new(Some("./cosmos-event-feed.log")); let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url) .await .unwrap(); - logger.info(&format!( + self.logger.info(&format!( "Following the publisher {}", self.chain_config.node_url )); @@ -83,10 +85,12 @@ impl CosmosFeed { let filter_events = events .iter() .filter(|tendermint_event| { - Self::convert_to_feeder_event(tendermint_event, &self.chain_config).is_some() + Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config) + .is_some() }) .flat_map(|tendermint_event| { - Self::convert_to_feeder_event(tendermint_event, &self.chain_config).unwrap() + Self::convert_to_feeder_event(self, tendermint_event, &self.chain_config) + .unwrap() }) .collect::>(); @@ -97,7 +101,7 @@ impl CosmosFeed { } drop(subs); - logger.info("Websocket connection closed!!"); + self.logger.info("Websocket connection closed!!"); let _ = driver_handle.await; Ok(()) } @@ -119,11 +123,12 @@ impl CosmosFeed { /// The function `convert_to_feeder_event` returns an `Option` containing a `Vec` of /// `serde_json::Value` elements. fn convert_to_feeder_event( + &self, event: &Event, chain_config: &ChainConfig, ) -> Option> { - let logger = CoreLogger::new(Some("./event-feed.log")); - logger.debug(&format!("Processing events : {:?}", event)); + self.logger + .debug(&format!("Processing events : {:?}", event)); match event.data.clone() { EventData::LegacyNewBlock { ref block, @@ -133,13 +138,13 @@ impl CosmosFeed { let block = block.as_ref().unwrap(); let block_number = block.header.version.block as usize; let hash_string = block.header.last_commit_hash.map(|h| h.to_string()); - logger.info(&format!( + self.logger.info(&format!( "Processing LegacyNewBlockEvent for block: {}", block.header.version.block )); let filtered_events: Vec = if chain_config.event_filter.is_empty() { - logger.info(&format!( + self.logger.info(&format!( "Processing all events from block : {}", block_number )); @@ -158,7 +163,7 @@ impl CosmosFeed { .map(|e| serde_json::to_value(e).unwrap()) .collect() } else { - logger.info("Filtering events based on the event name"); + self.logger.info("Filtering events based on the event name"); result_begin_block .unwrap() .events @@ -179,10 +184,10 @@ impl CosmosFeed { }; if !filtered_events.is_empty() { - logger.info("returning the filtered events"); + self.logger.info("returning the filtered events"); Some(filtered_events) } else { - logger.info("No events matched the filter"); + self.logger.info("No events matched the filter"); None } } diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs index 4e08c3c..5982fbb 100644 --- a/event_feed/src/eth/feeder.rs +++ b/event_feed/src/eth/feeder.rs @@ -5,6 +5,7 @@ pub struct EthFeed { eth_service: Provider, events: Vec<(String, H256)>, contracts: Vec, + logger: CoreLogger, } impl EthFeed { @@ -24,18 +25,19 @@ impl EthFeed { .collect::>(); let client = Provider::::connect(config.node_url).await?; + let logger = CoreLogger::new(Some("./event-feed.log")); let eth_feed = EthFeed { eth_service: client, events, contracts, + logger, }; Ok(eth_feed) } pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { - let logger = CoreLogger::new(Some("./eth-event-feed.log")); let client = Arc::new(&self.eth_service); let last_block = client @@ -50,7 +52,7 @@ impl EthFeed { let filter = Filter::new().from_block(last_block - 25).events(events); let mut stream = client.subscribe_logs(&filter).await?; - logger.info(&format!( + self.logger.info(&format!( "Subscribed to events with the filter : {:?}", filter )); @@ -79,7 +81,7 @@ impl EthFeed { let tx_receipt = client.get_transaction_receipt(tx_hash).await; if let Ok(Some(tx_receipt)) = tx_receipt { - logger.info(&format!( + self.logger.info(&format!( "Received trabsaction receipt for the tx_hash : {:?}", tx_hash )); @@ -91,7 +93,8 @@ impl EthFeed { { for evt in self.events.iter() { if log.topics[0] == evt.1 { - logger.info(&format!("Matched event : {:?}", evt.0.clone())); + self.logger + .info(&format!("Matched event : {:?}", evt.0.clone())); logs.push( serde_json::to_value( FeederEvent { diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index 5573f1c..f020f1d 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -7,6 +7,7 @@ pub struct IconFeed { icon_service: IconService, events: Vec, score: Vec, + logger: CoreLogger, } impl IconFeed { @@ -26,26 +27,28 @@ impl IconFeed { .collect::>(); let icon_service = icon_service::IconService::new(Some(config.node_url)); + let logger = CoreLogger::new(Some("./event-feed.log")); let icon_feed = IconFeed { icon_service, events, score, + logger, }; Ok(icon_feed) } pub async fn filter(&self, transaction: &Value) -> Result { - let logger = CoreLogger::new(Some("./icon-event-feed.log")); let mut events_filter = false; let mut score_filter = false; if !self.events.is_empty() || !self.score.is_empty() { - logger.info("Checking the evnt filters or score filters"); + self.logger + .info("Checking the evnt filters or score filters"); let tx_hash: String = serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap(); - logger.info(&format!( + self.logger.info(&format!( "Filtering the events with the tx_hash : {:?}", tx_hash )); @@ -56,13 +59,15 @@ impl IconFeed { if !&self.score.is_empty() { for filter_score in &self.score { if filter_score == &event_log.score_address { - logger.info(&format!("Matched the score filter : {:?}", filter_score)); + self.logger + .info(&format!("Matched the score filter : {:?}", filter_score)); score_filter = true; break; } } } else { - logger.info("No score filter found, allowing all the transactions"); + self.logger + .info("No score filter found, allowing all the transactions"); score_filter = true; } @@ -75,7 +80,8 @@ impl IconFeed { } } else { events_filter = true; - logger.info("No event filter found, allowing all the transactions"); + self.logger + .info("No event filter found, allowing all the transactions"); } if events_filter && score_filter { @@ -88,15 +94,15 @@ impl IconFeed { score_filter = true; } let result = events_filter && score_filter; - logger.info(&format!("Filtering the result : {:?}", result)); + self.logger + .info(&format!("Filtering the result : {:?}", result)); Ok(result) } pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { - let logger = CoreLogger::new(Some("./icon-event-feed.log")); let mut latest_height = get_icon_block_height(&self.icon_service).await?; let mut old_height = latest_height - 1; - logger.info(&format!( + self.logger.info(&format!( "Event feed started at {:?}, {:?}", latest_height, old_height )); @@ -130,7 +136,7 @@ impl IconFeed { } } - logger.info(&format!( + self.logger.info(&format!( "Filtered the {:?} transactions", filtered_tx.len() )); @@ -141,7 +147,8 @@ impl IconFeed { old_height += 1; } else { - logger.info("No new blocks got detected, sleeping for 1 second"); + self.logger + .info("No new blocks got detected, sleeping for 1 second"); sleep(Duration::from_secs(1)); } diff --git a/event_feed/src/icon/utils.rs b/event_feed/src/icon/utils.rs index 10b7745..dd62ae8 100644 --- a/event_feed/src/icon/utils.rs +++ b/event_feed/src/icon/utils.rs @@ -2,7 +2,7 @@ use super::*; use runtime::{logger::CoreLogger, Logger}; pub async fn get_icon_block_height(icon_service: &IconService) -> Result { - let logger = CoreLogger::new(Some("./icon-event-feed.log")); + let logger = CoreLogger::new(Some("./event-feed.log")); let latest_block = icon_service .get_last_block() .await @@ -20,7 +20,7 @@ pub async fn get_event_logs_by_tx_hash( icon_service: &IconService, tx_hash: &str, ) -> Result> { - let logger = CoreLogger::new(Some("./icon-event-feed.log")); + let logger = CoreLogger::new(Some("./event-feed.log")); let mut tx_result = icon_service.get_transaction_result(tx_hash).await; while let Err(error) = tx_result { diff --git a/event_feed/src/substrate/types.rs b/event_feed/src/substrate/types.rs index 3d0079c..98dd585 100644 --- a/event_feed/src/substrate/types.rs +++ b/event_feed/src/substrate/types.rs @@ -1,3 +1,5 @@ +use self::polkadot::runtime_apis::core::Core; + use super::*; use crate::common::ChainConfig; use runtime::{logger::CoreLogger, Logger}; @@ -8,14 +10,17 @@ pub struct PolkadotFeed { // Substrate client client: OnlineClient, + logger: CoreLogger, } impl PolkadotFeed { pub async fn new(chain_config: ChainConfig) -> Result> { let client = OnlineClient::::from_url(&chain_config.node_url).await?; + let logger = CoreLogger::new(Some("./event-feed.log")); Ok(PolkadotFeed { chain_config, client, + logger, }) } } @@ -26,15 +31,14 @@ impl PolkadotFeed { /// subscribes to finalized blocks on the Polkadot chain and processes the extrinsics and events /// within those blocks. pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { - let logger = CoreLogger::new(Some("./substrate-event-feed.log")); - logger.info("PolkadotEventFeed: event started"); + self.logger.info("PolkadotEventFeed: event started"); let mut blocks_sub = self.client.blocks().subscribe_finalized().await?; // For each block, print a bunch of information about it: loop { if let Some(block) = blocks_sub.next().await { let block = block.unwrap(); - logger.info("Processing the obtained block number"); + self.logger.info("Processing the obtained block number"); let mut fetched_events = Vec::new(); let extrinsics = block.extrinsics().await.unwrap(); for ext in extrinsics.iter() { @@ -81,11 +85,10 @@ impl PolkadotFeed { } pub fn split_filter(&self) -> std::collections::HashMap> { - let logger = CoreLogger::new(Some("./substrate-event-feed.log")); let mut filter_map: std::collections::HashMap> = std::collections::HashMap::new(); if !self.chain_config.event_filter.is_empty() { - logger.info("No event filters configured"); + self.logger.info("No event filters configured"); for pair in self.chain_config.event_filter.split(';') { if !pair.contains("=") { filter_map.insert(pair.to_string(), vec![]); From 113b483fb9da1f6b2ed07ca3011f3673f164b7d0 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Thu, 2 May 2024 14:16:08 +0530 Subject: [PATCH 03/12] chore: remove unwanted imports --- event_feed/src/cosmos/feeder.rs | 5 ++++- event_feed/src/cosmos/mod.rs | 10 ---------- event_feed/src/cosmos/tests.rs | 3 +-- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index d1866fa..9f7eb00 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -1,9 +1,12 @@ use super::*; -use crate::{common, cosmos}; +use crate::common::{self, ChainConfig}; use anyhow::*; +use futures::StreamExt; use runtime::{logger::CoreLogger, Logger}; use serde_json::Value; use tendermint_rpc::event::{Event, EventData}; +use tendermint_rpc::query::EventType; +use tendermint_rpc::{SubscriptionClient, WebSocketClient}; /// The `CosmosFeed` struct in Rust defines a feed for handling Cosmos blockchain events with filtering /// capabilities. diff --git a/event_feed/src/cosmos/mod.rs b/event_feed/src/cosmos/mod.rs index 6d5cb96..abfed55 100644 --- a/event_feed/src/cosmos/mod.rs +++ b/event_feed/src/cosmos/mod.rs @@ -1,13 +1,3 @@ -use crate::common::ChainConfig; -use futures::StreamExt; -use std::fmt::Error; -use std::thread::sleep; -use std::time::Duration; -use tendermint_rpc::event::Event; -use tendermint_rpc::query::{EventType, Query}; -use tendermint_rpc::Order::Ascending; -use tendermint_rpc::{Client, SubscriptionClient, WebSocketClient}; - mod feeder; pub use feeder::*; diff --git a/event_feed/src/cosmos/tests.rs b/event_feed/src/cosmos/tests.rs index 168a917..030c198 100644 --- a/event_feed/src/cosmos/tests.rs +++ b/event_feed/src/cosmos/tests.rs @@ -1,7 +1,6 @@ mod tests { - + use crate::common::ChainConfig; use super::super::*; - use crate::common::FeederEvent; #[tokio::test] async fn test_cosmos_feed_new() { From 2bd450425d0ac48571a45ac2fa2e2ab9bede489b Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Thu, 2 May 2024 15:12:17 +0530 Subject: [PATCH 04/12] chore: remove unwanted imports --- event_feed/Cargo.toml | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/event_feed/Cargo.toml b/event_feed/Cargo.toml index 9f414e1..a23300f 100644 --- a/event_feed/Cargo.toml +++ b/event_feed/Cargo.toml @@ -13,17 +13,16 @@ dirs = '5.0.1' envy = '0.4' anyhow = '1.0.82' icon-sdk = '1.2.0' -tendermint-rpc = {version = "0.35.0", features = ["http-client", "websocket-client"]} +tendermint-rpc = { version = "0.35.0", features = [ + "http-client", + "websocket-client", +] } futures = "0.3.30" -base64 = "0.22.0" [dependencies.ethers] version = '2.0.8' default_features = false -features = [ - 'ws', - 'rustls', -] +features = ['ws', 'rustls'] [dependencies.serde] version = '1.0.198' @@ -34,8 +33,4 @@ path = '../runtime/lite' [dependencies.tokio] version = '1.36.0' -features = [ - 'macros', - 'time', - 'rt-multi-thread', -] +features = ['macros', 'time', 'rt-multi-thread'] From 3586f9b48b28fbb7bdb3ea1f06c91bb76c763407 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Thu, 2 May 2024 18:32:29 +0530 Subject: [PATCH 05/12] chore: implement logger --- event_feed/src/cosmos/feeder.rs | 4 ++-- event_feed/src/cosmos/tests.rs | 13 ++++++++----- event_feed/src/eth/feeder.rs | 3 +-- event_feed/src/icon/feeder.rs | 3 +-- event_feed/src/icon/tests.rs | 31 +++++++++++++++++++------------ event_feed/src/icon/utils.rs | 6 ------ event_feed/src/main.rs | 10 +++++----- event_feed/src/substrate/tests.rs | 9 ++++++--- event_feed/src/substrate/types.rs | 3 +-- 9 files changed, 43 insertions(+), 39 deletions(-) diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index 9f7eb00..58e3a9d 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -40,14 +40,14 @@ impl CosmosFeed { /// Returns: /// /// A `CosmosFeed` struct instance is being returned. - pub fn new(chain_config: common::ChainConfig) -> CosmosFeed { + pub fn new(chain_config: common::ChainConfig, logger: CoreLogger) -> CosmosFeed { let events = chain_config .event_filter .split(',') .map(|e| e.to_string()) .filter(|e| !e.is_empty()) .collect::>(); - let logger = CoreLogger::new(Some("./event-feed.log")); + // let logger = CoreLogger::new(Some("./event-feed.log")); let cosmos = CosmosFeed { chain_config, events, diff --git a/event_feed/src/cosmos/tests.rs b/event_feed/src/cosmos/tests.rs index 030c198..9d459f2 100644 --- a/event_feed/src/cosmos/tests.rs +++ b/event_feed/src/cosmos/tests.rs @@ -1,6 +1,7 @@ mod tests { - use crate::common::ChainConfig; use super::super::*; + use crate::common::ChainConfig; + use runtime::{logger::CoreLogger}; #[tokio::test] async fn test_cosmos_feed_new() { @@ -10,8 +11,9 @@ mod tests { contract_filter: "".to_string(), chain: "cosmos".to_string(), }; - let cosmos_feed = CosmosFeed::new(chain_config.clone()); + let logger = CoreLogger::new(Some("./event-feed.log")); + let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger); assert_eq!(cosmos_feed.chain_config, chain_config); assert_eq!(cosmos_feed.events.len(), 1); assert!(cosmos_feed.events.contains(&"transfer".to_string())); @@ -29,8 +31,8 @@ mod tests { let callback = |events: Vec| { assert!(events.is_empty()); }; - - let cosmos_feed = CosmosFeed::new(chain_config.clone()); + let logger = CoreLogger::new(Some("./event-feed.log")); + let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger); let result = cosmos_feed.event_feed(&callback).await; assert!(result.is_ok()); @@ -51,7 +53,8 @@ mod tests { // Simulate an error condition assert!(events.is_empty()); }; - let cosmos_feed = CosmosFeed::new(chain_config.clone()); + let logger = CoreLogger::new(Some("./event-feed.log")); + let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger); let _ = cosmos_feed.event_feed(&callback).await; } diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs index 5982fbb..d7fde07 100644 --- a/event_feed/src/eth/feeder.rs +++ b/event_feed/src/eth/feeder.rs @@ -9,7 +9,7 @@ pub struct EthFeed { } impl EthFeed { - pub async fn new(config: ChainConfig) -> Result { + pub async fn new(config: ChainConfig, logger: CoreLogger) -> Result { let events = config .event_filter .split(';') @@ -25,7 +25,6 @@ impl EthFeed { .collect::>(); let client = Provider::::connect(config.node_url).await?; - let logger = CoreLogger::new(Some("./event-feed.log")); let eth_feed = EthFeed { eth_service: client, diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index f020f1d..df683d4 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -11,7 +11,7 @@ pub struct IconFeed { } impl IconFeed { - pub fn new(config: ChainConfig) -> Result { + pub fn new(config: ChainConfig, logger: CoreLogger) -> Result { let events = config .event_filter .split(',') @@ -27,7 +27,6 @@ impl IconFeed { .collect::>(); let icon_service = icon_service::IconService::new(Some(config.node_url)); - let logger = CoreLogger::new(Some("./event-feed.log")); let icon_feed = IconFeed { icon_service, diff --git a/event_feed/src/icon/tests.rs b/event_feed/src/icon/tests.rs index 72644b7..df999e9 100644 --- a/event_feed/src/icon/tests.rs +++ b/event_feed/src/icon/tests.rs @@ -1,15 +1,17 @@ mod tests { use super::super::*; + use runtime::{logger::CoreLogger, Logger}; #[tokio::test] async fn test_filter_events_true() { let config = ChainConfig { - chain:"Icon".to_string(), + chain: "Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "ICXTransfer".to_string(), contract_filter: "".to_string(), }; - let icon_feed = IconFeed::new(config).unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let icon_feed = IconFeed::new(config, logger).unwrap(); let icon_service = IconService::new(None); @@ -31,12 +33,13 @@ mod tests { #[tokio::test] async fn test_filter_events_false() { let config = ChainConfig { - chain:"Icon".to_string(), + chain: "Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "ICXIssued".to_string(), contract_filter: "".to_string(), }; - let icon_feed = IconFeed::new(config).unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let icon_feed = IconFeed::new(config, logger).unwrap(); let icon_service = IconService::new(None); @@ -54,12 +57,13 @@ mod tests { #[tokio::test] async fn test_filter_score_true() { let config = ChainConfig { - chain:"Icon".to_string(), + chain: "Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "".to_string(), contract_filter: "cx21e94c08c03daee80c25d8ee3ea22a20786ec231".to_string(), }; - let icon_feed = IconFeed::new(config).unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let icon_feed = IconFeed::new(config, logger).unwrap(); let icon_service = IconService::new(None); @@ -76,12 +80,13 @@ mod tests { #[tokio::test] async fn test_filter_score_false() { let config = ChainConfig { - chain:"Icon".to_string(), + chain: "Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "".to_string(), contract_filter: "cx21e94c08c03daee80c25d8ee3ea22a20786ec231".to_string(), }; - let icon_feed = IconFeed::new(config).unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let icon_feed = IconFeed::new(config, logger).unwrap(); let icon_service = IconService::new(None); @@ -98,12 +103,13 @@ mod tests { #[tokio::test] async fn test_filter_event_and_score_true() { let config = ChainConfig { - chain:"Icon".to_string(), + chain: "Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "Transfer".to_string(), contract_filter: "cx88fd7df7ddff82f7cc735c871dc519838cb235bb".to_string(), }; - let icon_feed = IconFeed::new(config).unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let icon_feed = IconFeed::new(config, logger).unwrap(); let icon_service = IconService::new(None); @@ -120,12 +126,13 @@ mod tests { #[tokio::test] async fn test_filter_event_and_score_false() { let config = ChainConfig { - chain:"Icon".to_string(), + chain: "Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "ICXIssued".to_string(), contract_filter: "cx88fd7df7ddff82f7cc735c871dc519838cb235bb".to_string(), }; - let icon_feed = IconFeed::new(config).unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let icon_feed = IconFeed::new(config, logger).unwrap(); let icon_service = IconService::new(None); diff --git a/event_feed/src/icon/utils.rs b/event_feed/src/icon/utils.rs index dd62ae8..a680580 100644 --- a/event_feed/src/icon/utils.rs +++ b/event_feed/src/icon/utils.rs @@ -1,8 +1,6 @@ use super::*; -use runtime::{logger::CoreLogger, Logger}; pub async fn get_icon_block_height(icon_service: &IconService) -> Result { - let logger = CoreLogger::new(Some("./event-feed.log")); let latest_block = icon_service .get_last_block() .await @@ -11,7 +9,6 @@ pub async fn get_icon_block_height(icon_service: &IconService) -> Result .and_then(|value| value.get("height")) .unwrap() .clone(); - logger.info(&format!("Got the latest block height {:?}", latest_block)); Ok(serde_json::from_value(latest_block)?) } @@ -20,7 +17,6 @@ pub async fn get_event_logs_by_tx_hash( icon_service: &IconService, tx_hash: &str, ) -> Result> { - let logger = CoreLogger::new(Some("./event-feed.log")); let mut tx_result = icon_service.get_transaction_result(tx_hash).await; while let Err(error) = tx_result { @@ -33,7 +29,6 @@ pub async fn get_event_logs_by_tx_hash( )); } - logger.info(&format!("Retrying the transaction result fetch for tx_hash: {}", tx_hash)); sleep(Duration::from_millis(1000)); tx_result = icon_service.get_transaction_result(tx_hash).await } @@ -47,7 +42,6 @@ pub async fn get_event_logs_by_tx_hash( .clone(), ) .unwrap(); - logger.info(&format!("Fetched {} event logs for the tx_hash {}", event_logs.len(), tx_hash)); Ok(event_logs) } diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index 2921ddb..f0a265f 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -16,8 +16,8 @@ mod substrate; #[tokio::main] async fn main() { - let logger = CoreLogger::new(Some("./event-feed.log")); dotenv::dotenv().ok(); + let logger = CoreLogger::new(Some("./event-feed.log")); let chain_config = envy::from_env::().unwrap(); let producer_config = envy::from_env::().unwrap(); @@ -27,24 +27,24 @@ async fn main() { let feed: Context = match chain_config.clone().chain.to_lowercase().as_str() { "substrate" => { - let polkadot_client = PolkadotFeed::::new(chain_config.clone()) + let polkadot_client = PolkadotFeed::::new(chain_config.clone(), logger.clone()) .await .unwrap(); logger.info("Polkadot client got created"); Context::PolkadotFeed(polkadot_client) } "icon" => { - let icon_client = IconFeed::new(chain_config.clone()).unwrap(); + let icon_client = IconFeed::new(chain_config.clone(), logger.clone()).unwrap(); logger.info("Icon client got created"); Context::IconFeed(icon_client) } "eth" => { - let eth_client = EthFeed::new(chain_config.clone()).await.unwrap(); + let eth_client = EthFeed::new(chain_config.clone(), logger.clone()).await.unwrap(); logger.info("Eth client got created"); Context::EthFeed(eth_client) } "cosmos" => { - let cosmos_client = CosmosFeed::new(chain_config.clone()); + let cosmos_client = CosmosFeed::new(chain_config.clone(), logger.clone()); logger.info("Cosmos client got created"); Context::CosmosFeed(cosmos_client) } diff --git a/event_feed/src/substrate/tests.rs b/event_feed/src/substrate/tests.rs index ac13de1..ce77dbf 100644 --- a/event_feed/src/substrate/tests.rs +++ b/event_feed/src/substrate/tests.rs @@ -1,4 +1,5 @@ use tests::types::PolkadotFeed; +use runtime::{logger::CoreLogger, Logger}; use super::*; #[cfg(test)] @@ -11,7 +12,8 @@ async fn test_filter_events() { event_filter: "balances=Transfer".to_string(), contract_filter: "".to_string(), }; - let polkadot = PolkadotFeed::::new(config).await.unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let polkadot = PolkadotFeed::::new(config, logger).await.unwrap(); let result = polkadot.split_filter(); assert_eq!( @@ -28,9 +30,10 @@ async fn test_filter_events_argument_with_no_method() { event_filter: "balances=Transfer;system".to_string(), contract_filter: "".to_string(), }; - let polkadot = PolkadotFeed::::new(config).await.unwrap(); + let logger = CoreLogger::new(Some("./event-feed.log")); + let polkadot = PolkadotFeed::::new(config, logger).await.unwrap(); let result = polkadot.split_filter(); - + assert_eq!( result, std::collections::HashMap::from([ diff --git a/event_feed/src/substrate/types.rs b/event_feed/src/substrate/types.rs index 98dd585..6517621 100644 --- a/event_feed/src/substrate/types.rs +++ b/event_feed/src/substrate/types.rs @@ -14,9 +14,8 @@ pub struct PolkadotFeed { } impl PolkadotFeed { - pub async fn new(chain_config: ChainConfig) -> Result> { + pub async fn new(chain_config: ChainConfig, logger: CoreLogger) -> Result> { let client = OnlineClient::::from_url(&chain_config.node_url).await?; - let logger = CoreLogger::new(Some("./event-feed.log")); Ok(PolkadotFeed { chain_config, client, From d1054bb74f6ef89a9fbc3bc23195492dc662efd9 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Thu, 2 May 2024 18:33:19 +0530 Subject: [PATCH 06/12] chore: code formating --- event_feed/src/main.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index f0a265f..8d01573 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -27,9 +27,10 @@ async fn main() { let feed: Context = match chain_config.clone().chain.to_lowercase().as_str() { "substrate" => { - let polkadot_client = PolkadotFeed::::new(chain_config.clone(), logger.clone()) - .await - .unwrap(); + let polkadot_client = + PolkadotFeed::::new(chain_config.clone(), logger.clone()) + .await + .unwrap(); logger.info("Polkadot client got created"); Context::PolkadotFeed(polkadot_client) } @@ -39,7 +40,9 @@ async fn main() { Context::IconFeed(icon_client) } "eth" => { - let eth_client = EthFeed::new(chain_config.clone(), logger.clone()).await.unwrap(); + let eth_client = EthFeed::new(chain_config.clone(), logger.clone()) + .await + .unwrap(); logger.info("Eth client got created"); Context::EthFeed(eth_client) } From 13be368ebbd99bf11838bbc1df240984465682e4 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Fri, 3 May 2024 18:29:33 +0530 Subject: [PATCH 07/12] chore: remove commented code --- event_feed/src/cosmos/feeder.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index 58e3a9d..7a69ffd 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -47,7 +47,6 @@ impl CosmosFeed { .map(|e| e.to_string()) .filter(|e| !e.is_empty()) .collect::>(); - // let logger = CoreLogger::new(Some("./event-feed.log")); let cosmos = CosmosFeed { chain_config, events, From e3ea116620f76e8981869831ada29cef8490df8c Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Fri, 3 May 2024 18:43:39 +0530 Subject: [PATCH 08/12] chore: fix code --- event_feed/src/main.rs | 11 ++++++----- event_feed/src/substrate/tests.rs | 12 ++++++++---- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index 53c49da..c357e34 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -3,7 +3,7 @@ use event_feed::{ }; use std::{fs, path}; use subxt::PolkadotConfig; - +use runtime::{logger::CoreLogger, Logger}; use clap::{Parser, Subcommand}; #[derive(Parser, Debug, Clone)] @@ -29,6 +29,7 @@ pub enum Commands { #[tokio::main] async fn main() -> Result<(), Box> { + let logger = CoreLogger::new(Some("./event-feed.log")); let args = EventFeed::parse(); //TODO config none case @@ -48,21 +49,21 @@ async fn main() -> Result<(), Box> { let feed = match args.command { Commands::Substrate => { - let polkadot_client = PolkadotFeed::::new(chain_config.clone()) + let polkadot_client = PolkadotFeed::::new(chain_config.clone(), logger.clone()) .await .unwrap(); Context::PolkadotFeed(polkadot_client) } Commands::Icon => { - let icon_client = IconFeed::new(chain_config.clone()).unwrap(); + let icon_client = IconFeed::new(chain_config.clone(), logger.clone()).unwrap(); Context::IconFeed(icon_client) } Commands::Eth => { - let eth_client = EthFeed::new(chain_config.clone()).await.unwrap(); + let eth_client = EthFeed::new(chain_config.clone(), logger.clone()).await.unwrap(); Context::EthFeed(eth_client) } Commands::Cosmos => { - let cosmos_client = CosmosFeed::new(chain_config.clone()); + let cosmos_client = CosmosFeed::new(chain_config.clone(), logger.clone()); Context::CosmosFeed(cosmos_client) } }; diff --git a/event_feed/src/substrate/tests.rs b/event_feed/src/substrate/tests.rs index ce77dbf..ad0b1ca 100644 --- a/event_feed/src/substrate/tests.rs +++ b/event_feed/src/substrate/tests.rs @@ -1,5 +1,5 @@ -use tests::types::PolkadotFeed; use runtime::{logger::CoreLogger, Logger}; +use tests::feeder::PolkadotFeed; use super::*; #[cfg(test)] @@ -13,7 +13,9 @@ async fn test_filter_events() { contract_filter: "".to_string(), }; let logger = CoreLogger::new(Some("./event-feed.log")); - let polkadot = PolkadotFeed::::new(config, logger).await.unwrap(); + let polkadot = PolkadotFeed::::new(config, logger) + .await + .unwrap(); let result = polkadot.split_filter(); assert_eq!( @@ -31,9 +33,11 @@ async fn test_filter_events_argument_with_no_method() { contract_filter: "".to_string(), }; let logger = CoreLogger::new(Some("./event-feed.log")); - let polkadot = PolkadotFeed::::new(config, logger).await.unwrap(); + let polkadot = PolkadotFeed::::new(config, logger) + .await + .unwrap(); let result = polkadot.split_filter(); - + assert_eq!( result, std::collections::HashMap::from([ From 2bc9066db61dd9e8d3cb178d5436873821370577 Mon Sep 17 00:00:00 2001 From: Shanith K K Date: Fri, 3 May 2024 19:09:41 +0530 Subject: [PATCH 09/12] chore: log block number in substrate --- event_feed/src/substrate/feeder.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/event_feed/src/substrate/feeder.rs b/event_feed/src/substrate/feeder.rs index 6517621..c21a869 100644 --- a/event_feed/src/substrate/feeder.rs +++ b/event_feed/src/substrate/feeder.rs @@ -1,5 +1,3 @@ -use self::polkadot::runtime_apis::core::Core; - use super::*; use crate::common::ChainConfig; use runtime::{logger::CoreLogger, Logger}; @@ -29,7 +27,7 @@ impl PolkadotFeed { /// asynchronous function that takes a callback function `cb` as a parameter. This function /// subscribes to finalized blocks on the Polkadot chain and processes the extrinsics and events /// within those blocks. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> where <::Header as subxt::config::Header>::Number: std::fmt::Display { self.logger.info("PolkadotEventFeed: event started"); let mut blocks_sub = self.client.blocks().subscribe_finalized().await?; @@ -37,7 +35,7 @@ impl PolkadotFeed { loop { if let Some(block) = blocks_sub.next().await { let block = block.unwrap(); - self.logger.info("Processing the obtained block number"); + self.logger.info(&format!("Processing the obtained block number {}", block.number())); let mut fetched_events = Vec::new(); let extrinsics = block.extrinsics().await.unwrap(); for ext in extrinsics.iter() { From ff65aba0058812cafd3dd6ff60e5bf2eee5a6f05 Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Fri, 3 May 2024 19:21:18 +0530 Subject: [PATCH 10/12] chore: remove unused imports --- event_feed/src/cosmos/feeder.rs | 3 +-- event_feed/src/icon/feeder.rs | 4 ---- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index 7a69ffd..c72488c 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -1,4 +1,3 @@ -use super::*; use crate::common::{self, ChainConfig}; use anyhow::*; use futures::StreamExt; @@ -72,7 +71,7 @@ impl CosmosFeed { .await .unwrap(); self.logger.info(&format!( - "Following the publisher {}", + "Following the chain at {}", self.chain_config.node_url )); let driver_handle = tokio::spawn(async move { driver.run().await }); diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index df683d4..171d196 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -1,5 +1,3 @@ -use crate::substrate::polkadot::runtime_apis::core::Core; - use super::*; use runtime::{logger::CoreLogger, Logger}; @@ -43,8 +41,6 @@ impl IconFeed { let mut score_filter = false; if !self.events.is_empty() || !self.score.is_empty() { - self.logger - .info("Checking the evnt filters or score filters"); let tx_hash: String = serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap(); self.logger.info(&format!( From ac0264de6eaf936980e33ae2e1a987ee430f2cdb Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Fri, 3 May 2024 19:42:50 +0530 Subject: [PATCH 11/12] chore: fixed code --- event_feed/src/cosmos/feeder.rs | 2 +- event_feed/src/cosmos/tests.rs | 3 ++- event_feed/src/eth/feeder.rs | 11 ++++++----- event_feed/src/icon/feeder.rs | 2 +- event_feed/src/substrate/feeder.rs | 29 ++++++++++++++--------------- 5 files changed, 24 insertions(+), 23 deletions(-) diff --git a/event_feed/src/cosmos/feeder.rs b/event_feed/src/cosmos/feeder.rs index ed0419f..d7d85ae 100644 --- a/event_feed/src/cosmos/feeder.rs +++ b/event_feed/src/cosmos/feeder.rs @@ -19,7 +19,7 @@ impl CosmosFeed { /// Arguments: /// /// * `chain_config`: An event feed chain configuration. - pub fn new(chain_config: common::ChainConfig) -> CosmosFeed { + pub fn new(chain_config: common::ChainConfig, logger: CoreLogger) -> CosmosFeed { let events = chain_config .event_filter .split(',') diff --git a/event_feed/src/cosmos/tests.rs b/event_feed/src/cosmos/tests.rs index 624b437..02cfbbc 100644 --- a/event_feed/src/cosmos/tests.rs +++ b/event_feed/src/cosmos/tests.rs @@ -1,6 +1,7 @@ mod tests { use super::super::*; - + use runtime::{logger::CoreLogger, Logger}; + #[tokio::test] async fn test_cosmos_feed_new() { let chain_config = ChainConfig { diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs index 7c7a3a1..e064eda 100644 --- a/event_feed/src/eth/feeder.rs +++ b/event_feed/src/eth/feeder.rs @@ -1,4 +1,5 @@ use super::*; +use runtime::{logger::CoreLogger, Logger}; /// Represents an Ethereum blockchain event feed. pub struct EthFeed { eth_service: Provider, @@ -9,11 +10,11 @@ pub struct EthFeed { impl EthFeed { /// Creates a new EthFeed instance based on the provided chain configuration. - /// + /// /// Arguments: - /// + /// /// * `config`: An event feed chain configuration. - pub async fn new(config: ChainConfig) -> Result { + pub async fn new(config: ChainConfig, logger: CoreLogger) -> Result { let events = config .event_filter .split(';') @@ -41,9 +42,9 @@ impl EthFeed { } /// Fetches the events from the Ethereum chain and returns it through the provided callback function. - /// + /// /// # Arguments: - /// + /// /// * `cb`: A callback function to return the events to the caller. pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let client = Arc::new(&self.eth_service); diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index 365d506..2a5f346 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -19,7 +19,7 @@ impl IconFeed { /// Returns: /// /// The `new` function is returning a `Result` containing an `IconFeed` struct. - pub fn new(config: ChainConfig) -> Result { + pub fn new(config: ChainConfig, logger: CoreLogger) -> Result { let events = config .event_filter .split(',') diff --git a/event_feed/src/substrate/feeder.rs b/event_feed/src/substrate/feeder.rs index ede60c0..9d4d0af 100644 --- a/event_feed/src/substrate/feeder.rs +++ b/event_feed/src/substrate/feeder.rs @@ -5,9 +5,9 @@ use runtime::{logger::CoreLogger, Logger}; #[derive(Debug, Clone)] /// The `PolkadotFeed` struct in Rust contains a `chain_config` field of type `ChainConfig` and a /// `client` field of type `OnlineClient`. -/// +/// /// Properties: -/// +/// /// * `chain_config`: The `chain_config` property is a field of type `ChainConfig` in the `PolkadotFeed` /// struct. It likely contains configuration settings related to the Polkadot chain that the feed is /// interacting with. @@ -23,16 +23,16 @@ pub struct PolkadotFeed { } impl PolkadotFeed { - /// Creates a new `PolkadotFeed` instance based on the provided chain configuration. - /// + /// /// # Arguments - /// + /// /// * `chain_config`: An event feed chain configuration. - pub async fn new(chain_config: ChainConfig) -> Result> { - let client = OnlineClient::::from_url(&chain_config.node_url) - .await - ?; + pub async fn new( + chain_config: ChainConfig, + logger: CoreLogger, + ) -> Result> { + let client = OnlineClient::::from_url(&chain_config.node_url).await?; Ok(PolkadotFeed { chain_config, client, @@ -43,18 +43,17 @@ impl PolkadotFeed { impl PolkadotFeed { /// Fetches the events from the Substrate chain and returns it through the provided callback function. - /// + /// /// # Arguments - /// + /// /// * `cb`: A callback function to return the events to the caller. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()>{ + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let mut blocks_sub = self.client.blocks().subscribe_finalized().await?; // For each block, print a bunch of information about it: loop { if let Some(block) = blocks_sub.next().await { let block = block.unwrap(); - self.logger.info(&format!("Processing the obtained block number {}", block.number())); let mut fetched_events = Vec::new(); let extrinsics = block.extrinsics().await.unwrap(); for ext in extrinsics.iter() { @@ -101,9 +100,9 @@ impl PolkadotFeed { } /// Splits a string by ';' and then by '=' to create a HasMap. This hashmap is used to filter the events. - /// + /// /// Returns: - /// + /// /// Returns the hashmap with the key as the pallet name and the value as the list of events. pub fn split_filter(&self) -> std::collections::HashMap> { let mut filter_map: std::collections::HashMap> = From c286fbeff25ef593afd39aa9beac1b15836d353b Mon Sep 17 00:00:00 2001 From: Prathiksha-Nataraja <90592522+Prathiksha-Nataraja@users.noreply.github.com> Date: Mon, 6 May 2024 10:27:51 +0530 Subject: [PATCH 12/12] chore: update code by removing the created file after the test --- event_feed/src/cosmos/tests.rs | 4 +++- event_feed/src/eth/feeder.rs | 2 +- event_feed/src/icon/tests.rs | 6 ++++++ event_feed/src/substrate/tests.rs | 2 ++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/event_feed/src/cosmos/tests.rs b/event_feed/src/cosmos/tests.rs index 02cfbbc..26a4531 100644 --- a/event_feed/src/cosmos/tests.rs +++ b/event_feed/src/cosmos/tests.rs @@ -1,7 +1,7 @@ mod tests { use super::super::*; use runtime::{logger::CoreLogger, Logger}; - + #[tokio::test] async fn test_cosmos_feed_new() { let chain_config = ChainConfig { @@ -33,6 +33,7 @@ mod tests { let logger = CoreLogger::new(Some("./event-feed.log")); let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger); let result = cosmos_feed.event_feed(&callback).await; + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(result.is_ok()); } @@ -56,5 +57,6 @@ mod tests { let cosmos_feed = CosmosFeed::new(chain_config.clone(), logger); let _ = cosmos_feed.event_feed(&callback).await; + let _ = tokio::fs::remove_file("./event-feed.log").await; } } diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs index e064eda..30ede70 100644 --- a/event_feed/src/eth/feeder.rs +++ b/event_feed/src/eth/feeder.rs @@ -91,7 +91,7 @@ impl EthFeed { if let Ok(Some(tx_receipt)) = tx_receipt { self.logger.info(&format!( - "Received trabsaction receipt for the tx_hash : {:?}", + "Received transaction receipt for the tx_hash : {:?}", tx_hash )); let mut logs = Vec::::new(); diff --git a/event_feed/src/icon/tests.rs b/event_feed/src/icon/tests.rs index df999e9..cacec8f 100644 --- a/event_feed/src/icon/tests.rs +++ b/event_feed/src/icon/tests.rs @@ -26,6 +26,7 @@ mod tests { .clone(), ) .unwrap(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(icon_feed.filter(&tx_list[1]).await.unwrap()); } @@ -50,6 +51,7 @@ mod tests { ) .await .unwrap(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(!icon_feed.filter(&tx.get("result").unwrap()).await.unwrap()); } @@ -73,6 +75,7 @@ mod tests { ) .await .unwrap(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(icon_feed.filter(&tx.get("result").unwrap()).await.unwrap()); } @@ -96,6 +99,7 @@ mod tests { ) .await .unwrap(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(!icon_feed.filter(&tx.get("result").unwrap()).await.unwrap()); } @@ -119,6 +123,7 @@ mod tests { ) .await .unwrap(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(icon_feed.filter(&tx.get("result").unwrap()).await.unwrap()); } @@ -142,6 +147,7 @@ mod tests { ) .await .unwrap(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert!(!icon_feed.filter(&tx.get("result").unwrap()).await.unwrap()); } diff --git a/event_feed/src/substrate/tests.rs b/event_feed/src/substrate/tests.rs index ad0b1ca..f25c29e 100644 --- a/event_feed/src/substrate/tests.rs +++ b/event_feed/src/substrate/tests.rs @@ -17,6 +17,7 @@ async fn test_filter_events() { .await .unwrap(); let result = polkadot.split_filter(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert_eq!( result, @@ -37,6 +38,7 @@ async fn test_filter_events_argument_with_no_method() { .await .unwrap(); let result = polkadot.split_filter(); + let _ = tokio::fs::remove_file("./event-feed.log").await; assert_eq!( result,