From 89aa1f036f2c93a15874b85c7fcd6cff14df12d1 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Sat, 7 Oct 2023 03:35:56 +0530 Subject: [PATCH] Add configurable cl audits table (#94) * Apply cl audits patch * Make cl audits configurable * Fixing missing variable. --------- Co-authored-by: juanito87 --- .../src/dao/generated/cl_audits.rs | 94 +++++++++++++++++++ digital_asset_types/src/dao/generated/mod.rs | 1 + .../src/dao/generated/prelude.rs | 1 + migration/src/lib.rs | 2 + migration/src/m20230919_072154_cl_audits.rs | 48 ++++++++++ nft_ingester/src/account_updates.rs | 2 +- nft_ingester/src/config.rs | 4 +- nft_ingester/src/main.rs | 2 + .../program_transformers/bubblegum/burn.rs | 3 +- .../bubblegum/cancel_redeem.rs | 3 +- .../bubblegum/collection_verification.rs | 3 +- .../bubblegum/creator_verification.rs | 3 +- .../src/program_transformers/bubblegum/db.rs | 29 +++++- .../bubblegum/delegate.rs | 3 +- .../program_transformers/bubblegum/mint_v1.rs | 3 +- .../src/program_transformers/bubblegum/mod.rs | 19 ++-- .../program_transformers/bubblegum/redeem.rs | 3 +- .../bubblegum/transfer.rs | 3 +- nft_ingester/src/program_transformers/mod.rs | 5 +- nft_ingester/src/transaction_notifications.rs | 3 +- 20 files changed, 209 insertions(+), 25 deletions(-) create mode 100644 digital_asset_types/src/dao/generated/cl_audits.rs create mode 100644 migration/src/m20230919_072154_cl_audits.rs diff --git a/digital_asset_types/src/dao/generated/cl_audits.rs b/digital_asset_types/src/dao/generated/cl_audits.rs new file mode 100644 index 000000000..a07714202 --- /dev/null +++ b/digital_asset_types/src/dao/generated/cl_audits.rs @@ -0,0 +1,94 @@ +//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3 +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +use std::convert::From; + +#[derive(Copy, Clone, Default, Debug, DeriveEntity)] +pub struct Entity; + +impl EntityName for Entity { + fn table_name(&self) -> &str { + "cl_audits" + } +} + +#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Serialize, Deserialize)] +pub struct Model { + pub id: i64, + pub tree: Vec, + pub node_idx: i64, + pub leaf_idx: Option, + pub seq: i64, + pub level: i64, + pub hash: Vec, + pub created_at: Option, + pub tx: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] +pub enum Column { + Id, + Tree, + NodeIdx, + LeafIdx, + Seq, + Level, + Hash, + CreatedAt, + Tx, +} + +#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] +pub enum PrimaryKey { + Id, +} + +impl PrimaryKeyTrait for PrimaryKey { + type ValueType = i64; + fn auto_increment() -> bool { + true + } +} + +#[derive(Copy, Clone, Debug, EnumIter)] +pub enum Relation {} + +impl ColumnTrait for Column { + type EntityName = Entity; + fn def(&self) -> ColumnDef { + match self { + Self::Id => ColumnType::BigInteger.def(), + Self::Tree => ColumnType::Binary.def(), + Self::NodeIdx => ColumnType::BigInteger.def(), + Self::LeafIdx => ColumnType::BigInteger.def().null(), + Self::Seq => ColumnType::BigInteger.def(), + Self::Level => ColumnType::BigInteger.def(), + Self::Hash => ColumnType::Binary.def(), + Self::CreatedAt => ColumnType::DateTime.def(), + Self::Tx => ColumnType::String(None).def(), + } + } +} + +impl RelationTrait for Relation { + fn def(&self) -> RelationDef { + panic!("No RelationDef") + } +} + +impl ActiveModelBehavior for ActiveModel {} + +impl From for ActiveModel { + fn from(item: crate::dao::cl_items::ActiveModel) -> Self { + return ActiveModel { + tree: item.tree, + level: item.level, + node_idx: item.node_idx, + hash: item.hash, + seq: item.seq, + leaf_idx: item.leaf_idx, + ..Default::default() + } + } +} \ No newline at end of file diff --git a/digital_asset_types/src/dao/generated/mod.rs b/digital_asset_types/src/dao/generated/mod.rs index 1ec6a22f2..5db9a8690 100644 --- a/digital_asset_types/src/dao/generated/mod.rs +++ b/digital_asset_types/src/dao/generated/mod.rs @@ -9,6 +9,7 @@ pub mod asset_data; pub mod asset_grouping; pub mod asset_v1_account_attachments; pub mod backfill_items; +pub mod cl_audits; pub mod cl_items; pub mod raw_txn; pub mod sea_orm_active_enums; diff --git a/digital_asset_types/src/dao/generated/prelude.rs b/digital_asset_types/src/dao/generated/prelude.rs index af5553ec7..79759cd1c 100644 --- a/digital_asset_types/src/dao/generated/prelude.rs +++ b/digital_asset_types/src/dao/generated/prelude.rs @@ -7,6 +7,7 @@ pub use super::asset_data::Entity as AssetData; pub use super::asset_grouping::Entity as AssetGrouping; pub use super::asset_v1_account_attachments::Entity as AssetV1AccountAttachments; pub use super::backfill_items::Entity as BackfillItems; +pub use super::cl_audits::Entity as ClAudits; pub use super::cl_items::Entity as ClItems; pub use super::raw_txn::Entity as RawTxn; pub use super::tasks::Entity as Tasks; diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 58e454bb5..7e38ac93d 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -29,6 +29,7 @@ mod m20230720_130101_remove_asset_grouping_null_constraints; mod m20230724_120101_add_group_info_seq; mod m20230726_013107_remove_not_null_constraint_from_group_value; mod m20230918_182123_add_raw_name_symbol; +mod m20230919_072154_cl_audits; pub struct Migrator; @@ -65,6 +66,7 @@ impl MigratorTrait for Migrator { Box::new(m20230724_120101_add_group_info_seq::Migration), Box::new(m20230726_013107_remove_not_null_constraint_from_group_value::Migration), Box::new(m20230918_182123_add_raw_name_symbol::Migration), + Box::new(m20230919_072154_cl_audits::Migration), ] } } diff --git a/migration/src/m20230919_072154_cl_audits.rs b/migration/src/m20230919_072154_cl_audits.rs new file mode 100644 index 000000000..b9fd6e214 --- /dev/null +++ b/migration/src/m20230919_072154_cl_audits.rs @@ -0,0 +1,48 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(ClAudits::Table) + .if_not_exists() + .col(ColumnDef::new(ClAudits::Id).big_integer().not_null().primary_key().auto_increment()) + .col(ColumnDef::new(ClAudits::Tree).binary().not_null()) + .col(ColumnDef::new(ClAudits::NodeIdx).big_integer().not_null()) + .col(ColumnDef::new(ClAudits::LeafIdx).big_integer()) + .col(ColumnDef::new(ClAudits::Seq).big_integer().not_null()) + .col(ColumnDef::new(ClAudits::Level).big_integer().not_null()) + .col(ColumnDef::new(ClAudits::Hash).binary().not_null()) + .col(ColumnDef::new(ClAudits::CreatedAt).date_time().default(SimpleExpr::Keyword(Keyword::CurrentTimestamp)).not_null()) + .col(ColumnDef::new(ClAudits::Tx).string().not_null()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(ClAudits::Table).to_owned()) + .await + } +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +enum ClAudits { + Table, + Id, + Tree, + NodeIdx, + LeafIdx, + Seq, + Level, + Hash, + CreatedAt, + Tx, +} diff --git a/nft_ingester/src/account_updates.rs b/nft_ingester/src/account_updates.rs index 1d837fce6..9791b7500 100644 --- a/nft_ingester/src/account_updates.rs +++ b/nft_ingester/src/account_updates.rs @@ -26,7 +26,7 @@ pub fn account_worker( tokio::spawn(async move { let source = T::new(config).await; if let Ok(mut msg) = source { - let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender)); + let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender, false)); loop { let e = msg.recv(stream_key, consumption_type.clone()).await; let mut tasks = JoinSet::new(); diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index ce4bb9c20..8c488a6b3 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -31,6 +31,7 @@ pub struct IngesterConfig { pub transaction_stream_worker_count: Option, pub code_version: Option<&'static str>, pub background_task_runner_config: Option, + pub cl_audits: Option, // save transaction logs for compressed nfts } impl IngesterConfig { @@ -124,7 +125,8 @@ pub fn setup_config(config_file: Option<&PathBuf>) -> IngesterConfig { figment = figment.join(Yaml::file(config_file)); } - let mut config: IngesterConfig = figment + let mut config: IngesterConfig = + figment .extract() .map_err(|config_error| IngesterError::ConfigurationError { msg: format!("{}", config_error), diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index 8f3a7c2c3..a09e7e416 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -174,6 +174,7 @@ pub async fn main() -> Result<(), IngesterError> { } else { ConsumptionType::New }, + config.cl_audits.unwrap_or(false), TRANSACTION_STREAM, ); @@ -187,6 +188,7 @@ pub async fn main() -> Result<(), IngesterError> { } else { ConsumptionType::New }, + config.cl_audits.unwrap_or(false), TRANSACTION_BACKFILL_STREAM, ); } diff --git a/nft_ingester/src/program_transformers/bubblegum/burn.rs b/nft_ingester/src/program_transformers/bubblegum/burn.rs index 9ec27738e..70ddcfcea 100644 --- a/nft_ingester/src/program_transformers/bubblegum/burn.rs +++ b/nft_ingester/src/program_transformers/bubblegum/burn.rs @@ -17,12 +17,13 @@ pub async fn burn<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let Some(cl) = &parsing_result.tree_update { - let seq = save_changelog_event(cl, bundle.slot, txn).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; let leaf_index = cl.index; let (asset_id, _) = Pubkey::find_program_address( &[ diff --git a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs index 970b45d0a..1b8f5842a 100644 --- a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs @@ -15,12 +15,13 @@ pub async fn cancel_redeem<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, txn).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; #[allow(unreachable_patterns)] return match le.schema { LeafSchema::V1 { diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs index 0b109b6e6..28b33f934 100644 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs @@ -13,6 +13,7 @@ pub async fn process<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -36,7 +37,7 @@ where "Handling collection verification event for {} (verify: {}): {}", collection, verify, bundle.txn_id ); - let seq = save_changelog_event(cl, bundle.slot, txn).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; let id_bytes = match le.schema { LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(), }; diff --git a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs index 4ac9ba180..134fe89ca 100644 --- a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs @@ -17,6 +17,7 @@ pub async fn process<'c, T>( bundle: &InstructionBundle<'c>, txn: &'c T, value: bool, + cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -40,7 +41,7 @@ where "Handling creator verification event for creator {} (verify: {}): {}", creator, verify, bundle.txn_id ); - let seq = save_changelog_event(cl, bundle.slot, txn).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; let asset_id_bytes = match le.schema { LeafSchema::V1 { diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index c2af02e93..c2fb801b6 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -1,5 +1,5 @@ use crate::error::IngesterError; -use digital_asset_types::dao::{asset, asset_creators, asset_grouping, backfill_items, cl_items}; +use digital_asset_types::dao::{asset, asset_creators, asset_grouping, backfill_items, cl_items, cl_audits}; use log::{debug, info}; use mpl_bubblegum::state::metaplex_adapter::Collection; use sea_orm::{ @@ -7,15 +7,19 @@ use sea_orm::{ }; use spl_account_compression::events::ChangeLogEventV1; +use std::convert::From; + pub async fn save_changelog_event<'c, T>( change_log_event: &ChangeLogEventV1, slot: u64, + txn_id: &str, txn: &T, + cl_audits: bool, ) -> Result where T: ConnectionTrait + TransactionTrait, { - insert_change_log(change_log_event, slot, txn).await?; + insert_change_log(change_log_event, slot, txn_id, txn, cl_audits).await?; Ok(change_log_event.seq) } @@ -26,7 +30,9 @@ fn node_idx_to_leaf_idx(index: i64, tree_height: u32) -> i64 { pub async fn insert_change_log<'c, T>( change_log_event: &ChangeLogEventV1, slot: u64, + txn_id: &str, txn: &T, + cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -37,11 +43,12 @@ where for p in change_log_event.path.iter() { let node_idx = p.index as i64; debug!( - "seq {}, index {} level {}, node {:?}", + "seq {}, index {} level {}, node {:?}, txn: {:?}", change_log_event.seq, p.index, i, - bs58::encode(p.node).into_string() + bs58::encode(p.node).into_string(), + txn_id, ); let leaf_idx = if i == 0 { Some(node_idx_to_leaf_idx(node_idx, depth as u32)) @@ -49,6 +56,7 @@ where None }; + let item = cl_items::ActiveModel { tree: Set(tree_id.to_vec()), level: Set(i), @@ -58,6 +66,13 @@ where leaf_idx: Set(leaf_idx), ..Default::default() }; + + let mut audit_item : Option = if(cl_audits) { + let mut ai : cl_audits::ActiveModel = item.clone().into(); + ai.tx = Set(txn_id.to_string()); + Some(ai) + } else { None }; + i += 1; let mut query = cl_items::Entity::insert(item) .on_conflict( @@ -75,6 +90,12 @@ where txn.execute(query) .await .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + + + // Insert the audit item after the insert into cl_items have been completed + if let Some(audit_item) = audit_item { + cl_audits::Entity::insert(audit_item).exec(txn).await?; + } } // If and only if the entire path of nodes was inserted into the `cl_items` table, then insert diff --git a/nft_ingester/src/program_transformers/bubblegum/delegate.rs b/nft_ingester/src/program_transformers/bubblegum/delegate.rs index f5865a8a4..88896de64 100644 --- a/nft_ingester/src/program_transformers/bubblegum/delegate.rs +++ b/nft_ingester/src/program_transformers/bubblegum/delegate.rs @@ -15,12 +15,13 @@ pub async fn delegate<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, txn).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; return match le.schema { LeafSchema::V1 { id, diff --git a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs index 0cb3b1f1a..5920a19d5 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs @@ -38,6 +38,7 @@ pub async fn mint_v1<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + cl_audits: bool, ) -> Result where T: ConnectionTrait + TransactionTrait, @@ -47,7 +48,7 @@ where &parsing_result.tree_update, &parsing_result.payload, ) { - let seq = save_changelog_event(cl, bundle.slot, txn).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; let metadata = args; #[allow(unreachable_patterns)] return match le.schema { diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs index b170feb65..4b5dc4cfd 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mod.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mod.rs @@ -27,6 +27,7 @@ pub async fn handle_bubblegum_instruction<'c, T>( bundle: &'c InstructionBundle<'c>, txn: &T, task_manager: &UnboundedSender, + cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -57,38 +58,38 @@ where match ix_type { InstructionName::Transfer => { - transfer::transfer(parsing_result, bundle, txn).await?; + transfer::transfer(parsing_result, bundle, txn, cl_audits).await?; } InstructionName::Burn => { - burn::burn(parsing_result, bundle, txn).await?; + burn::burn(parsing_result, bundle, txn, cl_audits).await?; } InstructionName::Delegate => { - delegate::delegate(parsing_result, bundle, txn).await?; + delegate::delegate(parsing_result, bundle, txn, cl_audits).await?; } InstructionName::MintV1 | InstructionName::MintToCollectionV1 => { - let task = mint_v1::mint_v1(parsing_result, bundle, txn).await?; + let task = mint_v1::mint_v1(parsing_result, bundle, txn, cl_audits).await?; task_manager.send(task)?; } InstructionName::Redeem => { - redeem::redeem(parsing_result, bundle, txn).await?; + redeem::redeem(parsing_result, bundle, txn, cl_audits).await?; } InstructionName::CancelRedeem => { - cancel_redeem::cancel_redeem(parsing_result, bundle, txn).await?; + cancel_redeem::cancel_redeem(parsing_result, bundle, txn, cl_audits).await?; } InstructionName::DecompressV1 => { decompress::decompress(parsing_result, bundle, txn).await?; } InstructionName::VerifyCreator => { - creator_verification::process(parsing_result, bundle, txn, true).await?; + creator_verification::process(parsing_result, bundle, txn, true, cl_audits).await?; } InstructionName::UnverifyCreator => { - creator_verification::process(parsing_result, bundle, txn, false).await?; + creator_verification::process(parsing_result, bundle, txn, false, cl_audits).await?; } InstructionName::VerifyCollection | InstructionName::UnverifyCollection | InstructionName::SetAndVerifyCollection => { - collection_verification::process(parsing_result, bundle, txn).await?; + collection_verification::process(parsing_result, bundle, txn, cl_audits).await?; } _ => debug!("Bubblegum: Not Implemented Instruction"), } diff --git a/nft_ingester/src/program_transformers/bubblegum/redeem.rs b/nft_ingester/src/program_transformers/bubblegum/redeem.rs index 7d4253469..b9b7f2c27 100644 --- a/nft_ingester/src/program_transformers/bubblegum/redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/redeem.rs @@ -14,12 +14,13 @@ pub async fn redeem<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let Some(cl) = &parsing_result.tree_update { - let seq = save_changelog_event(cl, bundle.slot, txn).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; let leaf_index = cl.index; let (asset_id, _) = Pubkey::find_program_address( &[ diff --git a/nft_ingester/src/program_transformers/bubblegum/transfer.rs b/nft_ingester/src/program_transformers/bubblegum/transfer.rs index 270bdd100..573f33a8f 100644 --- a/nft_ingester/src/program_transformers/bubblegum/transfer.rs +++ b/nft_ingester/src/program_transformers/bubblegum/transfer.rs @@ -16,12 +16,13 @@ pub async fn transfer<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, txn).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; #[allow(unreachable_patterns)] return match le.schema { LeafSchema::V1 { diff --git a/nft_ingester/src/program_transformers/mod.rs b/nft_ingester/src/program_transformers/mod.rs index ed2824498..e1af458eb 100644 --- a/nft_ingester/src/program_transformers/mod.rs +++ b/nft_ingester/src/program_transformers/mod.rs @@ -29,10 +29,11 @@ pub struct ProgramTransformer { task_sender: UnboundedSender, matchers: HashMap>, key_set: HashSet, + cl_audits: bool, } impl ProgramTransformer { - pub fn new(pool: PgPool, task_sender: UnboundedSender) -> Self { + pub fn new(pool: PgPool, task_sender: UnboundedSender, cl_audits: bool) -> Self { let mut matchers: HashMap> = HashMap::with_capacity(1); let bgum = BubblegumParser {}; let token_metadata = TokenMetadataParser {}; @@ -50,6 +51,7 @@ impl ProgramTransformer { task_sender, matchers, key_set: hs, + cl_audits: cl_audits, } } @@ -125,6 +127,7 @@ impl ProgramTransformer { &ix, &self.storage, &self.task_sender, + self.cl_audits, ) .await .map_err(|err| { diff --git a/nft_ingester/src/transaction_notifications.rs b/nft_ingester/src/transaction_notifications.rs index 577f4b613..6ca7cc5f4 100644 --- a/nft_ingester/src/transaction_notifications.rs +++ b/nft_ingester/src/transaction_notifications.rs @@ -24,12 +24,13 @@ pub fn transaction_worker( bg_task_sender: UnboundedSender, ack_channel: UnboundedSender<(&'static str, String)>, consumption_type: ConsumptionType, + cl_audits: bool, stream_key: &'static str, ) -> JoinHandle<()> { tokio::spawn(async move { let source = T::new(config).await; if let Ok(mut msg) = source { - let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender)); + let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender, cl_audits)); loop { let e = msg.recv(stream_key, consumption_type.clone()).await; let mut tasks = JoinSet::new();