From 9dd95371e86ddd8cac67db9d6f56268d6771e38a Mon Sep 17 00:00:00 2001 From: Shanith K K Date: Tue, 23 Apr 2024 16:36:57 +0530 Subject: [PATCH 1/3] chore: context implementation for event feeder --- event_feed/Cargo.toml | 1 + event_feed/src/common/context.rs | 14 ++++++ event_feed/src/common/mod.rs | 8 ++-- event_feed/src/icon/feeder.rs | 14 +++++- event_feed/src/icon/tests.rs | 6 +++ event_feed/src/main.rs | 72 ++++++++++++++++++++++++------- event_feed/src/substrate/tests.rs | 2 + event_feed/src/substrate/types.rs | 15 ++++++- 8 files changed, 112 insertions(+), 20 deletions(-) create mode 100644 event_feed/src/common/context.rs diff --git a/event_feed/Cargo.toml b/event_feed/Cargo.toml index fd29256..13b4a3a 100644 --- a/event_feed/Cargo.toml +++ b/event_feed/Cargo.toml @@ -16,6 +16,7 @@ envy = "0.4" anyhow = "1.0.82" icon-sdk = '1.2.0' runtime = {path = "../runtime/lite"} +async-trait = '0.1.80' [dependencies.tokio] version = '1.36.0' diff --git a/event_feed/src/common/context.rs b/event_feed/src/common/context.rs new file mode 100644 index 0000000..1d8a4fd --- /dev/null +++ b/event_feed/src/common/context.rs @@ -0,0 +1,14 @@ + +use std::sync::Arc; + +use super::*; + +pub struct Context{ + pub feed_client: Arc, +} + +impl Context { + pub fn new(client: Arc) -> Self{ + Context { feed_client: client } + } +} diff --git a/event_feed/src/common/mod.rs b/event_feed/src/common/mod.rs index 705f3de..36ff8ba 100644 --- a/event_feed/src/common/mod.rs +++ b/event_feed/src/common/mod.rs @@ -4,14 +4,16 @@ use serde::Deserialize; use serde_json::Value; mod kuska_client; pub use kuska_client::*; +mod context; +pub use context::*; - -pub trait EventFeeder{ - fn event_feed(&self, cb: fn(events: Vec)); +pub trait EventFeeder: Send + 'static { + fn event_feeder(&self, cb: &dyn Fn(Vec)); } #[derive(Deserialize, Debug, Clone)] pub struct ChainConfig { + pub chain: String, pub node_url: String, #[serde(default)] pub event_filter: String, diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index 60db7e8..5fbc475 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -1,3 +1,5 @@ +use crate::common::EventFeeder; + use super::*; pub struct IconFeed { @@ -79,7 +81,7 @@ impl IconFeed { Ok(events_filter & score_filter) } - pub async fn event_feed(&self, cb: fn(events: Vec)) -> Result<()> { + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let mut latest_height = get_icon_block_height(&self.icon_service).await?; let mut old_height = latest_height - 1; @@ -126,3 +128,13 @@ impl IconFeed { } } } + +impl EventFeeder for IconFeed { + fn event_feeder(&self, cb: &dyn Fn(Vec)) { + let r = async { + // Simulate an async operation that resolves to a value + self.event_feed(cb).await.unwrap(); + }; + + } +} diff --git a/event_feed/src/icon/tests.rs b/event_feed/src/icon/tests.rs index 523009c..72644b7 100644 --- a/event_feed/src/icon/tests.rs +++ b/event_feed/src/icon/tests.rs @@ -4,6 +4,7 @@ mod tests { #[tokio::test] async fn test_filter_events_true() { let config = ChainConfig { + chain:"Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "ICXTransfer".to_string(), contract_filter: "".to_string(), @@ -30,6 +31,7 @@ mod tests { #[tokio::test] async fn test_filter_events_false() { let config = ChainConfig { + chain:"Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "ICXIssued".to_string(), contract_filter: "".to_string(), @@ -52,6 +54,7 @@ mod tests { #[tokio::test] async fn test_filter_score_true() { let config = ChainConfig { + chain:"Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "".to_string(), contract_filter: "cx21e94c08c03daee80c25d8ee3ea22a20786ec231".to_string(), @@ -73,6 +76,7 @@ mod tests { #[tokio::test] async fn test_filter_score_false() { let config = ChainConfig { + chain:"Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "".to_string(), contract_filter: "cx21e94c08c03daee80c25d8ee3ea22a20786ec231".to_string(), @@ -94,6 +98,7 @@ mod tests { #[tokio::test] async fn test_filter_event_and_score_true() { let config = ChainConfig { + chain:"Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "Transfer".to_string(), contract_filter: "cx88fd7df7ddff82f7cc735c871dc519838cb235bb".to_string(), @@ -115,6 +120,7 @@ mod tests { #[tokio::test] async fn test_filter_event_and_score_false() { let config = ChainConfig { + chain:"Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "ICXIssued".to_string(), contract_filter: "cx88fd7df7ddff82f7cc735c871dc519838cb235bb".to_string(), diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index cb0927d..d199140 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -1,5 +1,10 @@ +use std::borrow::BorrowMut; +use std::sync::Arc; + +use crate::common::Context; use crate::common::Producer; use common::{ChainConfig, EventFeeder, ProducerConfig}; +use icon::IconFeed; use substrate::types::PolkadotFeed; use subxt::PolkadotConfig; @@ -18,19 +23,56 @@ async fn main() { let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); ssb_client.accept_invite().await.unwrap(); - let polkadot = PolkadotFeed::::new(chain_config).await; - polkadot - .event_feed(&|e| { - for i in e { - tokio::spawn(async move { - let producer_config = envy::from_env::().unwrap(); - let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); - ssb_client - .publish_feed(i) - .await - .expect("Failed to send event"); - }); - } - }) - .await; + match chain_config.clone().chain.to_lowercase().as_str() { + "substrate" => { + let s = Arc::new(PolkadotFeed::::new(chain_config.clone()).await); + let feed = Context::new(s); + feed.feed_client + .event_feed(&|e| { + for i in e { + tokio::spawn(async move { + let producer_config = envy::from_env::().unwrap(); + let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); + ssb_client + .publish_feed(i) + .await + .expect("Failed to send event"); + }); + } + }) + .await; + } + "icon" => { + let s = Arc::new(IconFeed::new(chain_config.clone()).unwrap()); + let feed = Context::new(s); + feed.feed_client + .event_feed(&|e| { + for i in e { + tokio::spawn(async move { + let producer_config = envy::from_env::().unwrap(); + let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); + ssb_client + .publish_feed(i) + .await + .expect("Failed to send event"); + }); + } + }) + .await.unwrap(); + } + _ => todo!(), + }; + + let feed: Context = match chain_config.clone().chain.to_lowercase().as_str() { + "substrate" => { + let s = Arc::new(PolkadotFeed::::new(chain_config.clone()).await); + Context::new(s) + } + "icon" => { + let s = Arc::new(IconFeed::new(chain_config.clone()).unwrap()); + Context::new(s) + } + _ => todo!(), + }; + } diff --git a/event_feed/src/substrate/tests.rs b/event_feed/src/substrate/tests.rs index ed60e4b..c64b90c 100644 --- a/event_feed/src/substrate/tests.rs +++ b/event_feed/src/substrate/tests.rs @@ -3,6 +3,7 @@ use crate::ChainConfig; #[tokio::test] async fn test_filter_events() { let config = ChainConfig { + chain:"Susbtrate".to_string(), node_url: "wss://rococo-rpc.polkadot.io".to_string(), event_filter: "balances=Transfer".to_string(), contract_filter: "".to_string(), @@ -17,6 +18,7 @@ async fn test_filter_events() { #[tokio::test] async fn test_filter_events_argument_with_no_method() { let config = ChainConfig { + chain:"Susbtrate".to_string(), node_url: "wss://rococo-rpc.polkadot.io".to_string(), event_filter: "balances=Transfer;system".to_string(), contract_filter: "".to_string(), diff --git a/event_feed/src/substrate/types.rs b/event_feed/src/substrate/types.rs index c476bb1..ba88f20 100644 --- a/event_feed/src/substrate/types.rs +++ b/event_feed/src/substrate/types.rs @@ -1,5 +1,5 @@ use super::*; -use crate::common::ChainConfig; +use crate::common::{ChainConfig, EventFeeder}; #[derive(Debug,Clone)] pub struct PolkadotFeed { @@ -111,3 +111,16 @@ macro_rules! events { .collect::, _>>(); }; } + + +impl EventFeeder for PolkadotFeed{ + fn event_feeder(&self, cb: &dyn Fn( Vec)) { + let r = async { + // Simulate an async operation that resolves to a value + self.event_feed(cb).await; + + }; + } + + +} \ No newline at end of file From 9439bca4a2e4f5304f2e550204d2827e942a4e25 Mon Sep 17 00:00:00 2001 From: Shanith K K Date: Wed, 24 Apr 2024 10:00:57 +0530 Subject: [PATCH 2/3] chore: implementation of context in event feeder --- event_feed/src/common/context.rs | 22 ++++++---- event_feed/src/common/mod.rs | 4 +- event_feed/src/icon/feeder.rs | 12 ------ event_feed/src/main.rs | 68 +++++++++---------------------- event_feed/src/substrate/mod.rs | 6 +-- event_feed/src/substrate/tests.rs | 27 ++++++++---- event_feed/src/substrate/types.rs | 19 ++------- 7 files changed, 58 insertions(+), 100 deletions(-) diff --git a/event_feed/src/common/context.rs b/event_feed/src/common/context.rs index 1d8a4fd..6035156 100644 --- a/event_feed/src/common/context.rs +++ b/event_feed/src/common/context.rs @@ -1,14 +1,20 @@ -use std::sync::Arc; +use crate::IconFeed; +use crate::PolkadotFeed; +use subxt::PolkadotConfig; -use super::*; - -pub struct Context{ - pub feed_client: Arc, +pub enum Context { + PolkadotFeed(PolkadotFeed), + IconFeed(IconFeed), } -impl Context { - pub fn new(client: Arc) -> Self{ - Context { feed_client: client } +impl Context { + pub async fn feed_events(&self, cb: &dyn Fn(Vec)) { + match self { + Context::PolkadotFeed(feed) => feed.event_feed(cb).await, + Context::IconFeed(feed) => { + let _ =feed.event_feed(cb).await; + } + } } } diff --git a/event_feed/src/common/mod.rs b/event_feed/src/common/mod.rs index 36ff8ba..d15afb2 100644 --- a/event_feed/src/common/mod.rs +++ b/event_feed/src/common/mod.rs @@ -7,9 +7,7 @@ pub use kuska_client::*; mod context; pub use context::*; -pub trait EventFeeder: Send + 'static { - fn event_feeder(&self, cb: &dyn Fn(Vec)); -} +pub use super::*; #[derive(Deserialize, Debug, Clone)] pub struct ChainConfig { diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index 5fbc475..ae19174 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -1,5 +1,3 @@ -use crate::common::EventFeeder; - use super::*; pub struct IconFeed { @@ -128,13 +126,3 @@ impl IconFeed { } } } - -impl EventFeeder for IconFeed { - fn event_feeder(&self, cb: &dyn Fn(Vec)) { - let r = async { - // Simulate an async operation that resolves to a value - self.event_feed(cb).await.unwrap(); - }; - - } -} diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index d199140..0a058be 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -1,9 +1,6 @@ -use std::borrow::BorrowMut; -use std::sync::Arc; - use crate::common::Context; use crate::common::Producer; -use common::{ChainConfig, EventFeeder, ProducerConfig}; +use common::{ChainConfig, ProducerConfig}; use icon::IconFeed; use substrate::types::PolkadotFeed; use subxt::PolkadotConfig; @@ -23,56 +20,29 @@ async fn main() { let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); ssb_client.accept_invite().await.unwrap(); - match chain_config.clone().chain.to_lowercase().as_str() { + let feed: Context = match chain_config.clone().chain.to_lowercase().as_str() { "substrate" => { - let s = Arc::new(PolkadotFeed::::new(chain_config.clone()).await); - let feed = Context::new(s); - feed.feed_client - .event_feed(&|e| { - for i in e { - tokio::spawn(async move { - let producer_config = envy::from_env::().unwrap(); - let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); - ssb_client - .publish_feed(i) - .await - .expect("Failed to send event"); - }); - } - }) - .await; + let polkadot_client = PolkadotFeed::::new(chain_config.clone()).await; + Context::PolkadotFeed(polkadot_client) } "icon" => { - let s = Arc::new(IconFeed::new(chain_config.clone()).unwrap()); - let feed = Context::new(s); - feed.feed_client - .event_feed(&|e| { - for i in e { - tokio::spawn(async move { - let producer_config = envy::from_env::().unwrap(); - let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); - ssb_client - .publish_feed(i) - .await - .expect("Failed to send event"); - }); - } - }) - .await.unwrap(); + let icon_client = IconFeed::new(chain_config.clone()).unwrap(); + Context::IconFeed(icon_client) } - _ => todo!(), + _ => panic!("Invalid Chain"), }; - let feed: Context = match chain_config.clone().chain.to_lowercase().as_str() { - "substrate" => { - let s = Arc::new(PolkadotFeed::::new(chain_config.clone()).await); - Context::new(s) - } - "icon" => { - let s = Arc::new(IconFeed::new(chain_config.clone()).unwrap()); - Context::new(s) + feed.feed_events(&|e| { + for i in e { + tokio::spawn(async move { + let producer_config = envy::from_env::().unwrap(); + let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); + ssb_client + .publish_feed(i) + .await + .expect("Failed to send event"); + }); } - _ => todo!(), - }; - + }) + .await; } diff --git a/event_feed/src/substrate/mod.rs b/event_feed/src/substrate/mod.rs index e0a64bf..dcf8e4c 100644 --- a/event_feed/src/substrate/mod.rs +++ b/event_feed/src/substrate/mod.rs @@ -1,8 +1,8 @@ -pub mod types; -use types::*; +#[cfg(test)] mod tests; +pub mod types; use serde::{Deserialize, Serialize}; -use subxt::{Config, OnlineClient, PolkadotConfig}; +use subxt::{OnlineClient, PolkadotConfig}; #[subxt::subxt(runtime_metadata_path = "./src/common/utils/polkadot_metadata_full.scale")] pub mod polkadot {} diff --git a/event_feed/src/substrate/tests.rs b/event_feed/src/substrate/tests.rs index c64b90c..98108e2 100644 --- a/event_feed/src/substrate/tests.rs +++ b/event_feed/src/substrate/tests.rs @@ -1,32 +1,41 @@ +use tests::types::PolkadotFeed; + use super::*; +#[cfg(test)] use crate::ChainConfig; #[tokio::test] async fn test_filter_events() { let config = ChainConfig { - chain:"Susbtrate".to_string(), + chain: "Susbtrate".to_string(), node_url: "wss://rococo-rpc.polkadot.io".to_string(), event_filter: "balances=Transfer".to_string(), contract_filter: "".to_string(), }; let polkadot = PolkadotFeed::::new(config).await; - let result = polkadot - .split_filter(); + let result = polkadot.split_filter(); - assert_eq!(result, std::collections::HashMap::from([("balances".to_string(), vec!["Transfer"])])); + assert_eq!( + result, + std::collections::HashMap::from([("balances".to_string(), vec!["Transfer"])]) + ); } #[tokio::test] async fn test_filter_events_argument_with_no_method() { let config = ChainConfig { - chain:"Susbtrate".to_string(), + chain: "Susbtrate".to_string(), node_url: "wss://rococo-rpc.polkadot.io".to_string(), event_filter: "balances=Transfer;system".to_string(), contract_filter: "".to_string(), }; let polkadot = PolkadotFeed::::new(config).await; - let result = polkadot - .split_filter(); + let result = polkadot.split_filter(); - assert_eq!(result, std::collections::HashMap::from([("balances".to_string(), vec!["Transfer"]), ("system".to_string(), vec![])])); + assert_eq!( + result, + std::collections::HashMap::from([ + ("balances".to_string(), vec!["Transfer"]), + ("system".to_string(), vec![]) + ]) + ); } - diff --git a/event_feed/src/substrate/types.rs b/event_feed/src/substrate/types.rs index ba88f20..dc6ff4e 100644 --- a/event_feed/src/substrate/types.rs +++ b/event_feed/src/substrate/types.rs @@ -1,7 +1,7 @@ use super::*; -use crate::common::{ChainConfig, EventFeeder}; +use crate::common::ChainConfig; -#[derive(Debug,Clone)] +#[derive(Debug, Clone)] pub struct PolkadotFeed { chain_config: ChainConfig, @@ -26,7 +26,7 @@ impl PolkadotFeed { /// asynchronous function that takes a callback function `cb` as a parameter. This function /// subscribes to finalized blocks on the Polkadot chain and processes the extrinsics and events /// within those blocks. - pub async fn event_feed(&self, cb: &dyn Fn( Vec)) { + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) { let mut blocks_sub = self.client.blocks().subscribe_finalized().await.unwrap(); // For each block, print a bunch of information about it: @@ -111,16 +111,3 @@ macro_rules! events { .collect::, _>>(); }; } - - -impl EventFeeder for PolkadotFeed{ - fn event_feeder(&self, cb: &dyn Fn( Vec)) { - let r = async { - // Simulate an async operation that resolves to a value - self.event_feed(cb).await; - - }; - } - - -} \ No newline at end of file From fcddd42d77253b3efea1b1d83d76a2ef52b899e4 Mon Sep 17 00:00:00 2001 From: Shanith K K Date: Wed, 24 Apr 2024 10:50:23 +0530 Subject: [PATCH 3/3] chore: error handling substrate event feed --- event_feed/src/common/context.rs | 6 +++--- event_feed/src/main.rs | 4 ++-- event_feed/src/substrate/mod.rs | 1 + event_feed/src/substrate/tests.rs | 4 ++-- event_feed/src/substrate/types.rs | 16 ++++++++-------- 5 files changed, 16 insertions(+), 15 deletions(-) diff --git a/event_feed/src/common/context.rs b/event_feed/src/common/context.rs index 6035156..0bdac6b 100644 --- a/event_feed/src/common/context.rs +++ b/event_feed/src/common/context.rs @@ -1,7 +1,7 @@ - use crate::IconFeed; use crate::PolkadotFeed; use subxt::PolkadotConfig; +use anyhow::Result; pub enum Context { PolkadotFeed(PolkadotFeed), @@ -9,11 +9,11 @@ pub enum Context { } impl Context { - pub async fn feed_events(&self, cb: &dyn Fn(Vec)) { + pub async fn feed_events(&self, cb: &dyn Fn(Vec)) -> Result<()>{ match self { Context::PolkadotFeed(feed) => feed.event_feed(cb).await, Context::IconFeed(feed) => { - let _ =feed.event_feed(cb).await; + feed.event_feed(cb).await } } } diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index 0a058be..0598beb 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -22,7 +22,7 @@ async fn main() { let feed: Context = match chain_config.clone().chain.to_lowercase().as_str() { "substrate" => { - let polkadot_client = PolkadotFeed::::new(chain_config.clone()).await; + let polkadot_client = PolkadotFeed::::new(chain_config.clone()).await.unwrap(); Context::PolkadotFeed(polkadot_client) } "icon" => { @@ -32,7 +32,7 @@ async fn main() { _ => panic!("Invalid Chain"), }; - feed.feed_events(&|e| { + let _ = feed.feed_events(&|e| { for i in e { tokio::spawn(async move { let producer_config = envy::from_env::().unwrap(); diff --git a/event_feed/src/substrate/mod.rs b/event_feed/src/substrate/mod.rs index dcf8e4c..85745ff 100644 --- a/event_feed/src/substrate/mod.rs +++ b/event_feed/src/substrate/mod.rs @@ -3,6 +3,7 @@ mod tests; pub mod types; use serde::{Deserialize, Serialize}; use subxt::{OnlineClient, PolkadotConfig}; +use anyhow::Result; #[subxt::subxt(runtime_metadata_path = "./src/common/utils/polkadot_metadata_full.scale")] pub mod polkadot {} diff --git a/event_feed/src/substrate/tests.rs b/event_feed/src/substrate/tests.rs index 98108e2..ac13de1 100644 --- a/event_feed/src/substrate/tests.rs +++ b/event_feed/src/substrate/tests.rs @@ -11,7 +11,7 @@ async fn test_filter_events() { event_filter: "balances=Transfer".to_string(), contract_filter: "".to_string(), }; - let polkadot = PolkadotFeed::::new(config).await; + let polkadot = PolkadotFeed::::new(config).await.unwrap(); let result = polkadot.split_filter(); assert_eq!( @@ -28,7 +28,7 @@ async fn test_filter_events_argument_with_no_method() { event_filter: "balances=Transfer;system".to_string(), contract_filter: "".to_string(), }; - let polkadot = PolkadotFeed::::new(config).await; + let polkadot = PolkadotFeed::::new(config).await.unwrap(); let result = polkadot.split_filter(); assert_eq!( diff --git a/event_feed/src/substrate/types.rs b/event_feed/src/substrate/types.rs index dc6ff4e..dad7803 100644 --- a/event_feed/src/substrate/types.rs +++ b/event_feed/src/substrate/types.rs @@ -10,14 +10,14 @@ pub struct PolkadotFeed { } impl PolkadotFeed { - pub async fn new(chain_config: ChainConfig) -> PolkadotFeed { + pub async fn new(chain_config: ChainConfig) -> Result> { let client = OnlineClient::::from_url(&chain_config.node_url) .await - .unwrap(); - PolkadotFeed { + ?; + Ok(PolkadotFeed { chain_config, client, - } + }) } } @@ -26,8 +26,8 @@ impl PolkadotFeed { /// asynchronous function that takes a callback function `cb` as a parameter. This function /// subscribes to finalized blocks on the Polkadot chain and processes the extrinsics and events /// within those blocks. - pub async fn event_feed(&self, cb: &dyn Fn(Vec)) { - let mut blocks_sub = self.client.blocks().subscribe_finalized().await.unwrap(); + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()>{ + let mut blocks_sub = self.client.blocks().subscribe_finalized().await?; // For each block, print a bunch of information about it: loop { @@ -67,8 +67,8 @@ impl PolkadotFeed { method: event.variant_name().to_string(), field_value: data, }; - let serialize_event = serde_json::to_value(&decode_event); - fetched_events.push(serialize_event.unwrap()); + let serialize_event = serde_json::to_value(&decode_event)?; + fetched_events.push(serialize_event); } } }