Skip to content

Commit

Permalink
Add configurable cl audits table (#94)
Browse files Browse the repository at this point in the history
* Apply cl audits patch

* Make cl audits configurable

* Fixing missing variable.

---------

Co-authored-by: juanito87 <[email protected]>
  • Loading branch information
linuskendall and Juanito87 authored Oct 6, 2023
1 parent 93d02fe commit 89aa1f0
Show file tree
Hide file tree
Showing 20 changed files with 209 additions and 25 deletions.
94 changes: 94 additions & 0 deletions digital_asset_types/src/dao/generated/cl_audits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

use std::convert::From;

#[derive(Copy, Clone, Default, Debug, DeriveEntity)]
pub struct Entity;

impl EntityName for Entity {
fn table_name(&self) -> &str {
"cl_audits"
}
}

#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Serialize, Deserialize)]
pub struct Model {
pub id: i64,
pub tree: Vec<u8>,
pub node_idx: i64,
pub leaf_idx: Option<i64>,
pub seq: i64,
pub level: i64,
pub hash: Vec<u8>,
pub created_at: Option<DateTime>,
pub tx: String,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
pub enum Column {
Id,
Tree,
NodeIdx,
LeafIdx,
Seq,
Level,
Hash,
CreatedAt,
Tx,
}

#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)]
pub enum PrimaryKey {
Id,
}

impl PrimaryKeyTrait for PrimaryKey {
type ValueType = i64;
fn auto_increment() -> bool {
true
}
}

#[derive(Copy, Clone, Debug, EnumIter)]
pub enum Relation {}

impl ColumnTrait for Column {
type EntityName = Entity;
fn def(&self) -> ColumnDef {
match self {
Self::Id => ColumnType::BigInteger.def(),
Self::Tree => ColumnType::Binary.def(),
Self::NodeIdx => ColumnType::BigInteger.def(),
Self::LeafIdx => ColumnType::BigInteger.def().null(),
Self::Seq => ColumnType::BigInteger.def(),
Self::Level => ColumnType::BigInteger.def(),
Self::Hash => ColumnType::Binary.def(),
Self::CreatedAt => ColumnType::DateTime.def(),
Self::Tx => ColumnType::String(None).def(),
}
}
}

impl RelationTrait for Relation {
fn def(&self) -> RelationDef {
panic!("No RelationDef")
}
}

impl ActiveModelBehavior for ActiveModel {}

impl From<crate::dao::cl_items::ActiveModel> for ActiveModel {
fn from(item: crate::dao::cl_items::ActiveModel) -> Self {
return ActiveModel {
tree: item.tree,
level: item.level,
node_idx: item.node_idx,
hash: item.hash,
seq: item.seq,
leaf_idx: item.leaf_idx,
..Default::default()
}
}
}
1 change: 1 addition & 0 deletions digital_asset_types/src/dao/generated/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod asset_data;
pub mod asset_grouping;
pub mod asset_v1_account_attachments;
pub mod backfill_items;
pub mod cl_audits;
pub mod cl_items;
pub mod raw_txn;
pub mod sea_orm_active_enums;
Expand Down
1 change: 1 addition & 0 deletions digital_asset_types/src/dao/generated/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use super::asset_data::Entity as AssetData;
pub use super::asset_grouping::Entity as AssetGrouping;
pub use super::asset_v1_account_attachments::Entity as AssetV1AccountAttachments;
pub use super::backfill_items::Entity as BackfillItems;
pub use super::cl_audits::Entity as ClAudits;
pub use super::cl_items::Entity as ClItems;
pub use super::raw_txn::Entity as RawTxn;
pub use super::tasks::Entity as Tasks;
Expand Down
2 changes: 2 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod m20230720_130101_remove_asset_grouping_null_constraints;
mod m20230724_120101_add_group_info_seq;
mod m20230726_013107_remove_not_null_constraint_from_group_value;
mod m20230918_182123_add_raw_name_symbol;
mod m20230919_072154_cl_audits;

pub struct Migrator;

Expand Down Expand Up @@ -65,6 +66,7 @@ impl MigratorTrait for Migrator {
Box::new(m20230724_120101_add_group_info_seq::Migration),
Box::new(m20230726_013107_remove_not_null_constraint_from_group_value::Migration),
Box::new(m20230918_182123_add_raw_name_symbol::Migration),
Box::new(m20230919_072154_cl_audits::Migration),
]
}
}
48 changes: 48 additions & 0 deletions migration/src/m20230919_072154_cl_audits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(ClAudits::Table)
.if_not_exists()
.col(ColumnDef::new(ClAudits::Id).big_integer().not_null().primary_key().auto_increment())
.col(ColumnDef::new(ClAudits::Tree).binary().not_null())
.col(ColumnDef::new(ClAudits::NodeIdx).big_integer().not_null())
.col(ColumnDef::new(ClAudits::LeafIdx).big_integer())
.col(ColumnDef::new(ClAudits::Seq).big_integer().not_null())
.col(ColumnDef::new(ClAudits::Level).big_integer().not_null())
.col(ColumnDef::new(ClAudits::Hash).binary().not_null())
.col(ColumnDef::new(ClAudits::CreatedAt).date_time().default(SimpleExpr::Keyword(Keyword::CurrentTimestamp)).not_null())
.col(ColumnDef::new(ClAudits::Tx).string().not_null())
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(ClAudits::Table).to_owned())
.await
}
}

/// Learn more at https://docs.rs/sea-query#iden
#[derive(Iden)]
enum ClAudits {
Table,
Id,
Tree,
NodeIdx,
LeafIdx,
Seq,
Level,
Hash,
CreatedAt,
Tx,
}
2 changes: 1 addition & 1 deletion nft_ingester/src/account_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub fn account_worker<T: Messenger>(
tokio::spawn(async move {
let source = T::new(config).await;
if let Ok(mut msg) = source {
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender));
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender, false));
loop {
let e = msg.recv(stream_key, consumption_type.clone()).await;
let mut tasks = JoinSet::new();
Expand Down
4 changes: 3 additions & 1 deletion nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct IngesterConfig {
pub transaction_stream_worker_count: Option<u32>,
pub code_version: Option<&'static str>,
pub background_task_runner_config: Option<BackgroundTaskRunnerConfig>,
pub cl_audits: Option<bool>, // save transaction logs for compressed nfts
}

impl IngesterConfig {
Expand Down Expand Up @@ -124,7 +125,8 @@ pub fn setup_config(config_file: Option<&PathBuf>) -> IngesterConfig {
figment = figment.join(Yaml::file(config_file));
}

let mut config: IngesterConfig = figment
let mut config: IngesterConfig =
figment
.extract()
.map_err(|config_error| IngesterError::ConfigurationError {
msg: format!("{}", config_error),
Expand Down
2 changes: 2 additions & 0 deletions nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub async fn main() -> Result<(), IngesterError> {
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
TRANSACTION_STREAM,
);

Expand All @@ -187,6 +188,7 @@ pub async fn main() -> Result<(), IngesterError> {
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
TRANSACTION_BACKFILL_STREAM,
);
}
Expand Down
3 changes: 2 additions & 1 deletion nft_ingester/src/program_transformers/bubblegum/burn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ pub async fn burn<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let Some(cl) = &parsing_result.tree_update {
let seq = save_changelog_event(cl, bundle.slot, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let leaf_index = cl.index;
let (asset_id, _) = Pubkey::find_program_address(
&[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ pub async fn cancel_redeem<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
#[allow(unreachable_patterns)]
return match le.schema {
LeafSchema::V1 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub async fn process<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -36,7 +37,7 @@ where
"Handling collection verification event for {} (verify: {}): {}",
collection, verify, bundle.txn_id
);
let seq = save_changelog_event(cl, bundle.slot, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let id_bytes = match le.schema {
LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub async fn process<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
value: bool,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -40,7 +41,7 @@ where
"Handling creator verification event for creator {} (verify: {}): {}",
creator, verify, bundle.txn_id
);
let seq = save_changelog_event(cl, bundle.slot, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;

let asset_id_bytes = match le.schema {
LeafSchema::V1 {
Expand Down
29 changes: 25 additions & 4 deletions nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
use crate::error::IngesterError;
use digital_asset_types::dao::{asset, asset_creators, asset_grouping, backfill_items, cl_items};
use digital_asset_types::dao::{asset, asset_creators, asset_grouping, backfill_items, cl_items, cl_audits};
use log::{debug, info};
use mpl_bubblegum::state::metaplex_adapter::Collection;
use sea_orm::{
query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait,
};
use spl_account_compression::events::ChangeLogEventV1;

use std::convert::From;

pub async fn save_changelog_event<'c, T>(
change_log_event: &ChangeLogEventV1,
slot: u64,
txn_id: &str,
txn: &T,
cl_audits: bool,
) -> Result<u64, IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
insert_change_log(change_log_event, slot, txn).await?;
insert_change_log(change_log_event, slot, txn_id, txn, cl_audits).await?;
Ok(change_log_event.seq)
}

Expand All @@ -26,7 +30,9 @@ fn node_idx_to_leaf_idx(index: i64, tree_height: u32) -> i64 {
pub async fn insert_change_log<'c, T>(
change_log_event: &ChangeLogEventV1,
slot: u64,
txn_id: &str,
txn: &T,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -37,18 +43,20 @@ where
for p in change_log_event.path.iter() {
let node_idx = p.index as i64;
debug!(
"seq {}, index {} level {}, node {:?}",
"seq {}, index {} level {}, node {:?}, txn: {:?}",
change_log_event.seq,
p.index,
i,
bs58::encode(p.node).into_string()
bs58::encode(p.node).into_string(),
txn_id,
);
let leaf_idx = if i == 0 {
Some(node_idx_to_leaf_idx(node_idx, depth as u32))
} else {
None
};


let item = cl_items::ActiveModel {
tree: Set(tree_id.to_vec()),
level: Set(i),
Expand All @@ -58,6 +66,13 @@ where
leaf_idx: Set(leaf_idx),
..Default::default()
};

let mut audit_item : Option<cl_audits::ActiveModel> = if(cl_audits) {
let mut ai : cl_audits::ActiveModel = item.clone().into();
ai.tx = Set(txn_id.to_string());
Some(ai)
} else { None };

i += 1;
let mut query = cl_items::Entity::insert(item)
.on_conflict(
Expand All @@ -75,6 +90,12 @@ where
txn.execute(query)
.await
.map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?;


// Insert the audit item after the insert into cl_items have been completed
if let Some(audit_item) = audit_item {
cl_audits::Entity::insert(audit_item).exec(txn).await?;
}
}

// If and only if the entire path of nodes was inserted into the `cl_items` table, then insert
Expand Down
3 changes: 2 additions & 1 deletion nft_ingester/src/program_transformers/bubblegum/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ pub async fn delegate<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
return match le.schema {
LeafSchema::V1 {
id,
Expand Down
Loading

0 comments on commit 89aa1f0

Please sign in to comment.