From 9d6ef667ba271d34792838b34868662573751fb7 Mon Sep 17 00:00:00 2001 From: Tahsin Tunan Date: Sat, 20 Jan 2024 07:10:19 +0600 Subject: [PATCH] feat: `getSignaturesForAsset` endpoint on top of new `cl_audits_v2` table (#155) * add migration files for cl_audits_v2 * add types * ingester * add getSignaturesForAsset endpoint * refactor to resolve merge conflict related bugs * address clippy error * rename to get_asset_signatures * add instruction type update_metadata * add error log if instruction is unknown * add sort order changes --------- Co-authored-by: Nicolas Pennie --- das_api/src/api/api_impl.rs | 44 ++++++- das_api/src/api/mod.rs | 29 ++++- das_api/src/builder.rs | 12 ++ .../src/dao/generated/cl_audits_v2.rs | 74 ++++++++++++ digital_asset_types/src/dao/generated/mod.rs | 3 +- .../src/dao/generated/prelude.rs | 4 +- .../src/dao/generated/sea_orm_active_enums.rs | 81 +++++++++++++ digital_asset_types/src/dao/scopes/asset.rs | 99 +++++++++++++++- digital_asset_types/src/dapi/common/asset.rs | 26 +++++ .../src/dapi/get_asset_signatures.rs | 35 ++++++ digital_asset_types/src/dapi/mod.rs | 6 +- digital_asset_types/src/rpc/response.rs | 14 +++ migration/src/lib.rs | 6 + migration/src/m20230919_072154_cl_audits.rs | 2 +- .../src/m20240104_203133_add_cl_audits_v2.rs | 109 ++++++++++++++++++ .../src/m20240104_203328_remove_cl_audits.rs | 57 +++++++++ ...m20240116_130744_add_update_metadata_ix.rs | 24 ++++ .../program_transformers/bubblegum/burn.rs | 4 +- .../bubblegum/cancel_redeem.rs | 4 +- .../bubblegum/collection_verification.rs | 4 +- .../bubblegum/creator_verification.rs | 4 +- .../src/program_transformers/bubblegum/db.rs | 63 +++++++--- .../bubblegum/delegate.rs | 4 +- .../program_transformers/bubblegum/mint_v1.rs | 4 +- .../src/program_transformers/bubblegum/mod.rs | 20 ++-- .../program_transformers/bubblegum/redeem.rs | 4 +- .../bubblegum/transfer.rs | 5 +- .../bubblegum/update_metadata.rs | 4 +- 28 files changed, 695 insertions(+), 50 deletions(-) create mode 100644 digital_asset_types/src/dao/generated/cl_audits_v2.rs create mode 100644 digital_asset_types/src/dapi/get_asset_signatures.rs create mode 100644 migration/src/m20240104_203133_add_cl_audits_v2.rs create mode 100644 migration/src/m20240104_203328_remove_cl_audits.rs create mode 100644 migration/src/m20240116_130744_add_update_metadata_ix.rs diff --git a/das_api/src/api/api_impl.rs b/das_api/src/api/api_impl.rs index 3fa3ee274..7456c5ec2 100644 --- a/das_api/src/api/api_impl.rs +++ b/das_api/src/api/api_impl.rs @@ -7,8 +7,9 @@ use digital_asset_types::{ Cursor, PageOptions, SearchAssetsQuery, }, dapi::{ - get_asset, get_asset_proofs, get_assets, get_assets_by_authority, get_assets_by_creator, - get_assets_by_group, get_assets_by_owner, get_proof_for_asset, search_assets, + get_asset, get_asset_proofs, get_asset_signatures, get_assets, get_assets_by_authority, + get_assets_by_creator, get_assets_by_group, get_assets_by_owner, get_proof_for_asset, + search_assets, }, rpc::{ filter::{AssetSortBy, SearchConditionType}, @@ -461,6 +462,45 @@ impl ApiContract for DasApi { .map_err(Into::into) } + async fn get_asset_signatures( + self: &DasApi, + payload: GetAssetSignatures, + ) -> Result { + let GetAssetSignatures { + id, + limit, + page, + before, + after, + tree, + leaf_index, + cursor, + sort_direction, + } = payload; + + if !((id.is_some() && tree.is_none() && leaf_index.is_none()) + || (id.is_none() && tree.is_some() && leaf_index.is_some())) + { + return Err(DasApiError::ValidationError( + "Must provide either 'id' or both 'tree' and 'leafIndex'".to_string(), + )); + } + let id = validate_opt_pubkey(&id)?; + let tree = validate_opt_pubkey(&tree)?; + + let page_options = self.validate_pagination(limit, page, &before, &after, &cursor, None)?; + + get_asset_signatures( + &self.db_connection, + id, + tree, + leaf_index, + page_options, + sort_direction, + ) + .await + .map_err(Into::into) + } async fn get_grouping( self: &DasApi, payload: GetGrouping, diff --git a/das_api/src/api/mod.rs b/das_api/src/api/mod.rs index a2db05d66..53959b19c 100644 --- a/das_api/src/api/mod.rs +++ b/das_api/src/api/mod.rs @@ -1,8 +1,8 @@ use crate::DasApiError; use async_trait::async_trait; -use digital_asset_types::rpc::filter::SearchConditionType; +use digital_asset_types::rpc::filter::{AssetSortDirection, SearchConditionType}; use digital_asset_types::rpc::options::Options; -use digital_asset_types::rpc::response::AssetList; +use digital_asset_types::rpc::response::{AssetList, TransactionSignatureList}; use digital_asset_types::rpc::{filter::AssetSorting, response::GetGroupingResponse}; use digital_asset_types::rpc::{Asset, AssetProof, Interface, OwnershipModel, RoyaltyModel}; use open_rpc_derive::{document_rpc, rpc}; @@ -147,6 +147,22 @@ pub struct GetGrouping { pub group_value: String, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, Default)] +#[serde(deny_unknown_fields, rename_all = "camelCase")] +pub struct GetAssetSignatures { + pub id: Option, + pub limit: Option, + pub page: Option, + pub before: Option, + pub after: Option, + pub tree: Option, + pub leaf_index: Option, + #[serde(default)] + pub cursor: Option, + #[serde(default)] + pub sort_direction: Option, +} + #[document_rpc] #[async_trait] pub trait ApiContract: Send + Sync + 'static { @@ -220,6 +236,15 @@ pub trait ApiContract: Send + Sync + 'static { summary = "Search for assets by a variety of parameters" )] async fn search_assets(&self, payload: SearchAssets) -> Result; + #[rpc( + name = "getAssetSignatures", + params = "named", + summary = "Get transaction signatures for an asset" + )] + async fn get_asset_signatures( + &self, + payload: GetAssetSignatures, + ) -> Result; #[rpc( name = "getGrouping", params = "named", diff --git a/das_api/src/builder.rs b/das_api/src/builder.rs index 5d0005e36..9e3da1c81 100644 --- a/das_api/src/builder.rs +++ b/das_api/src/builder.rs @@ -94,6 +94,18 @@ impl RpcApiBuilder { )?; module.register_alias("getAssetsByGroup", "get_assets_by_group")?; + module.register_async_method( + "getAssetSignatures", + |rpc_params, rpc_context| async move { + let payload = rpc_params.parse::()?; + rpc_context + .get_asset_signatures(payload) + .await + .map_err(Into::into) + }, + )?; + module.register_alias("getSignaturesForAsset", "getAssetSignatures")?; + module.register_async_method("search_assets", |rpc_params, rpc_context| async move { let payload = rpc_params.parse::()?; rpc_context.search_assets(payload).await.map_err(Into::into) diff --git a/digital_asset_types/src/dao/generated/cl_audits_v2.rs b/digital_asset_types/src/dao/generated/cl_audits_v2.rs new file mode 100644 index 000000000..d1875c254 --- /dev/null +++ b/digital_asset_types/src/dao/generated/cl_audits_v2.rs @@ -0,0 +1,74 @@ +//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3 + +use super::sea_orm_active_enums::Instruction; +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Copy, Clone, Default, Debug, DeriveEntity)] +pub struct Entity; + +impl EntityName for Entity { + fn table_name(&self) -> &str { + "cl_audits_v2" + } +} + +#[derive(Clone, Debug, PartialEq, DeriveModel, DeriveActiveModel, Serialize, Deserialize)] +pub struct Model { + pub id: i64, + pub tree: Vec, + pub leaf_idx: i64, + pub seq: i64, + pub created_at: DateTime, + pub tx: Vec, + pub instruction: Instruction, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] +pub enum Column { + Id, + Tree, + LeafIdx, + Seq, + CreatedAt, + Tx, + Instruction, +} + +#[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] +pub enum PrimaryKey { + Id, +} + +impl PrimaryKeyTrait for PrimaryKey { + type ValueType = i64; + fn auto_increment() -> bool { + true + } +} + +#[derive(Copy, Clone, Debug, EnumIter)] +pub enum Relation {} + +impl ColumnTrait for Column { + type EntityName = Entity; + fn def(&self) -> ColumnDef { + match self { + Self::Id => ColumnType::BigInteger.def(), + Self::Tree => ColumnType::Binary.def(), + Self::LeafIdx => ColumnType::BigInteger.def(), + Self::Seq => ColumnType::BigInteger.def(), + Self::CreatedAt => ColumnType::DateTime.def(), + Self::Tx => ColumnType::Binary.def(), + Self::Instruction => Instruction::db_type(), + } + } +} + +impl RelationTrait for Relation { + fn def(&self) -> RelationDef { + panic!("No RelationDef") + } +} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/digital_asset_types/src/dao/generated/mod.rs b/digital_asset_types/src/dao/generated/mod.rs index 5db9a8690..68e1db8fa 100644 --- a/digital_asset_types/src/dao/generated/mod.rs +++ b/digital_asset_types/src/dao/generated/mod.rs @@ -1,4 +1,4 @@ -//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 pub mod prelude; @@ -10,6 +10,7 @@ pub mod asset_grouping; pub mod asset_v1_account_attachments; pub mod backfill_items; pub mod cl_audits; +pub mod cl_audits_v2; pub mod cl_items; pub mod raw_txn; pub mod sea_orm_active_enums; diff --git a/digital_asset_types/src/dao/generated/prelude.rs b/digital_asset_types/src/dao/generated/prelude.rs index fd5b03e50..58d11a50a 100644 --- a/digital_asset_types/src/dao/generated/prelude.rs +++ b/digital_asset_types/src/dao/generated/prelude.rs @@ -1,4 +1,4 @@ -//! SeaORM Entity. Generated by sea-orm-codegen 0.9.3 +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.11.3 #![allow(unused_imports)] pub use super::asset::Entity as Asset; @@ -8,7 +8,7 @@ pub use super::asset_data::Entity as AssetData; pub use super::asset_grouping::Entity as AssetGrouping; pub use super::asset_v1_account_attachments::Entity as AssetV1AccountAttachments; pub use super::backfill_items::Entity as BackfillItems; -pub use super::cl_audits::Entity as ClAudits; +pub use super::cl_audits_v2::Entity as ClAuditsV2; pub use super::cl_items::Entity as ClItems; pub use super::raw_txn::Entity as RawTxn; pub use super::tasks::Entity as Tasks; diff --git a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs index 11362ae1c..5688d3821 100644 --- a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs +++ b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs @@ -123,3 +123,84 @@ pub enum V1AccountAttachments { #[sea_orm(string_value = "unknown")] Unknown, } + +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "instruction")] +pub enum Instruction { + #[sea_orm(string_value = "burn")] + Burn, + #[sea_orm(string_value = "cancel_redeem")] + CancelRedeem, + #[sea_orm(string_value = "compress")] + Compress, + #[sea_orm(string_value = "decompress_v1")] + DecompressV1, + #[sea_orm(string_value = "delegate")] + Delegate, + #[sea_orm(string_value = "mint_to_collection_v1")] + MintToCollectionV1, + #[sea_orm(string_value = "mint_v1")] + MintV1, + #[sea_orm(string_value = "redeem")] + Redeem, + #[sea_orm(string_value = "set_and_verify_collection")] + SetAndVerifyCollection, + #[sea_orm(string_value = "transfer")] + Transfer, + #[sea_orm(string_value = "unknown")] + Unknown, + #[sea_orm(string_value = "unverify_collection")] + UnverifyCollection, + #[sea_orm(string_value = "unverify_creator")] + UnverifyCreator, + #[sea_orm(string_value = "verify_collection")] + VerifyCollection, + #[sea_orm(string_value = "verify_creator")] + VerifyCreator, + #[sea_orm(string_value = "update_metadata")] + UpdateMetadata, +} +// Added manually for convenience. +impl Instruction { + pub fn from_str(s: &str) -> Self { + match s { + "Burn" => Instruction::Burn, + "CancelRedeem" => Instruction::CancelRedeem, + "Compress" => Instruction::Compress, + "DecompressV1" => Instruction::DecompressV1, + "Delegate" => Instruction::Delegate, + "MintToCollectionV1" => Instruction::MintToCollectionV1, + "MintV1" => Instruction::MintV1, + "Redeem" => Instruction::Redeem, + "SetAndVerifyCollection" => Instruction::SetAndVerifyCollection, + "Transfer" => Instruction::Transfer, + "UnverifyCollection" => Instruction::UnverifyCollection, + "UnverifyCreator" => Instruction::UnverifyCreator, + "VerifyCollection" => Instruction::VerifyCollection, + "VerifyCreator" => Instruction::VerifyCreator, + "UpdateMetadata" => Instruction::UpdateMetadata, + _ => Instruction::Unknown, + } + } + + pub const fn to_str(s: &Self) -> &str { + match s { + Instruction::Burn => "Burn", + Instruction::CancelRedeem => "CancelReddem", + Instruction::Compress => "Compress", + Instruction::DecompressV1 => "DecompressV1", + Instruction::Delegate => "Delegate", + Instruction::MintToCollectionV1 => "MintToCollectionV1", + Instruction::MintV1 => "MintV1", + Instruction::Redeem => "Redeem", + Instruction::SetAndVerifyCollection => "SetAndVerifyCollection", + Instruction::Transfer => "Transfer", + Instruction::UnverifyCollection => "UnverifyCollection", + Instruction::UnverifyCreator => "UnverifyCreator", + Instruction::VerifyCollection => "VerifyCollection", + Instruction::VerifyCreator => "VerifyCreator", + Instruction::UpdateMetadata => "UpdateMetadata", + _ => "Unknown", + } + } +} diff --git a/digital_asset_types/src/dao/scopes/asset.rs b/digital_asset_types/src/dao/scopes/asset.rs index d6324758f..fccec9594 100644 --- a/digital_asset_types/src/dao/scopes/asset.rs +++ b/digital_asset_types/src/dao/scopes/asset.rs @@ -1,6 +1,11 @@ -use crate::dao::{ - asset, asset_authority, asset_creators, asset_data, asset_grouping, Cursor, FullAsset, - GroupingSize, Pagination, +use crate::{ + dao::{ + asset::{self}, + asset_authority, asset_creators, asset_data, asset_grouping, cl_audits_v2, + sea_orm_active_enums::Instruction, + Cursor, FullAsset, GroupingSize, Pagination, + }, + rpc::filter::AssetSortDirection, }; use indexmap::IndexMap; use sea_orm::{entity::*, query::*, ConnectionTrait, DbErr, Order}; @@ -427,6 +432,94 @@ pub async fn get_by_id( }) } +pub async fn fetch_transactions( + conn: &impl ConnectionTrait, + tree: Vec, + leaf_idx: i64, + pagination: &Pagination, + limit: u64, + sort_direction: Option, +) -> Result, DbErr> { + // Default sort direction is Desc + // Similar to GetSignaturesForAddress in the Solana API + let sort_direction = sort_direction.unwrap_or(AssetSortDirection::Desc); + let sort_order = match sort_direction { + AssetSortDirection::Asc => sea_orm::Order::Asc, + AssetSortDirection::Desc => sea_orm::Order::Desc, + }; + + let mut stmt = cl_audits_v2::Entity::find().filter(cl_audits_v2::Column::Tree.eq(tree)); + stmt = stmt.filter(cl_audits_v2::Column::LeafIdx.eq(leaf_idx)); + stmt = stmt.order_by(cl_audits_v2::Column::Seq, sort_order.clone()); + + stmt = paginate( + pagination, + limit, + stmt, + sort_order, + cl_audits_v2::Column::Seq, + ); + let transactions = stmt.all(conn).await?; + let transaction_list = transactions + .into_iter() + .map(|transaction| { + let tx = bs58::encode(transaction.tx).into_string(); + let ix = Instruction::to_str(&transaction.instruction).to_string(); + (tx, ix) + }) + .collect(); + + Ok(transaction_list) +} + +pub async fn get_asset_signatures( + conn: &impl ConnectionTrait, + asset_id: Option>, + tree_id: Option>, + leaf_idx: Option, + pagination: &Pagination, + limit: u64, + sort_direction: Option, +) -> Result, DbErr> { + // if tree_id and leaf_idx are provided, use them directly to fetch transactions + if let (Some(tree_id), Some(leaf_idx)) = (tree_id, leaf_idx) { + let transactions = + fetch_transactions(conn, tree_id, leaf_idx, pagination, limit, sort_direction).await?; + return Ok(transactions); + } + + if asset_id.is_none() { + return Err(DbErr::Custom( + "Either 'id' or both 'tree' and 'leafIndex' must be provided".to_string(), + )); + } + + // if only asset_id is provided, fetch the latest tree and leaf_idx (asset.nonce) for the asset + // and use them to fetch transactions + let stmt = asset::Entity::find() + .distinct_on([(asset::Entity, asset::Column::Id)]) + .filter(asset::Column::Id.eq(asset_id)) + .order_by(asset::Column::Id, Order::Desc) + .limit(1); + let asset = stmt.one(conn).await?; + if let Some(asset) = asset { + let tree = asset + .tree_id + .ok_or(DbErr::RecordNotFound("Tree not found".to_string()))?; + if tree.is_empty() { + return Err(DbErr::Custom("Empty tree for asset".to_string())); + } + let leaf_idx = asset + .nonce + .ok_or(DbErr::RecordNotFound("Leaf ID does not exist".to_string()))?; + let transactions = + fetch_transactions(conn, tree, leaf_idx, pagination, limit, sort_direction).await?; + Ok(transactions) + } else { + Ok(Vec::new()) + } +} + fn filter_out_stale_creators(creators: &mut Vec) { // If the first creator is an empty Vec, it means the creator array is empty (which is allowed // for compressed assets in Bubblegum). diff --git a/digital_asset_types/src/dapi/common/asset.rs b/digital_asset_types/src/dapi/common/asset.rs index 78c91b690..655eefdc4 100644 --- a/digital_asset_types/src/dapi/common/asset.rs +++ b/digital_asset_types/src/dapi/common/asset.rs @@ -5,6 +5,7 @@ use crate::dao::Pagination; use crate::dao::{asset, asset_authority, asset_creators, asset_data, asset_grouping}; use crate::rpc::filter::{AssetSortBy, AssetSortDirection, AssetSorting}; use crate::rpc::options::Options; +use crate::rpc::response::TransactionSignatureList; use crate::rpc::response::{AssetError, AssetList}; use crate::rpc::{ Asset as RpcAsset, Authority, Compression, Content, Creator, File, Group, Interface, @@ -83,6 +84,31 @@ pub fn build_asset_response( } } +pub fn build_transaction_signatures_response( + items: Vec<(String, String)>, + limit: u64, + pagination: &Pagination, +) -> TransactionSignatureList { + let total = items.len() as u32; + let (page, before, after) = match pagination { + Pagination::Keyset { before, after } => { + let bef = before.clone().and_then(|x| String::from_utf8(x).ok()); + let aft = after.clone().and_then(|x| String::from_utf8(x).ok()); + (None, bef, aft) + } + Pagination::Page { page } => (Some(*page), None, None), + Pagination::Cursor { .. } => (None, None, None), + }; + TransactionSignatureList { + total, + limit: limit as u32, + page: page.map(|x| x as u32), + before, + after, + items, + } +} + pub fn create_sorting(sorting: AssetSorting) -> (sea_orm::query::Order, Option) { let sort_column = match sorting.sort_by { AssetSortBy::Id => Some(asset::Column::Id), diff --git a/digital_asset_types/src/dapi/get_asset_signatures.rs b/digital_asset_types/src/dapi/get_asset_signatures.rs new file mode 100644 index 000000000..79250d911 --- /dev/null +++ b/digital_asset_types/src/dapi/get_asset_signatures.rs @@ -0,0 +1,35 @@ +use crate::dao::scopes; +use crate::dao::PageOptions; + +use crate::rpc::filter::AssetSortDirection; +use crate::rpc::response::TransactionSignatureList; +use sea_orm::DatabaseConnection; +use sea_orm::DbErr; + +use super::common::{build_transaction_signatures_response, create_pagination}; + +pub async fn get_asset_signatures( + db: &DatabaseConnection, + asset_id: Option>, + tree: Option>, + leaf_idx: Option, + page_options: PageOptions, + sort_direction: Option, +) -> Result { + let pagination = create_pagination(&page_options)?; + let transactions = scopes::asset::get_asset_signatures( + db, + asset_id, + tree, + leaf_idx, + &pagination, + page_options.limit, + sort_direction, + ) + .await?; + Ok(build_transaction_signatures_response( + transactions, + page_options.limit, + &pagination, + )) +} diff --git a/digital_asset_types/src/dapi/mod.rs b/digital_asset_types/src/dapi/mod.rs index efe5deee3..e9481169a 100644 --- a/digital_asset_types/src/dapi/mod.rs +++ b/digital_asset_types/src/dapi/mod.rs @@ -3,13 +3,17 @@ mod assets_by_creator; mod assets_by_group; mod assets_by_owner; mod change_logs; -pub mod common; mod get_asset; +mod get_asset_signatures; mod search_assets; + +pub mod common; + pub use assets_by_authority::*; pub use assets_by_creator::*; pub use assets_by_group::*; pub use assets_by_owner::*; pub use change_logs::*; pub use get_asset::*; +pub use get_asset_signatures::*; pub use search_assets::*; diff --git a/digital_asset_types/src/rpc/response.rs b/digital_asset_types/src/rpc/response.rs index 3945f6955..53076c8b2 100644 --- a/digital_asset_types/src/rpc/response.rs +++ b/digital_asset_types/src/rpc/response.rs @@ -36,3 +36,17 @@ pub struct AssetList { #[serde(skip_serializing_if = "Vec::is_empty")] pub errors: Vec, } + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Default, JsonSchema)] +#[serde(default)] +pub struct TransactionSignatureList { + pub total: u32, + pub limit: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub page: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub before: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub after: Option, + pub items: Vec<(String, String)>, +} diff --git a/migration/src/lib.rs b/migration/src/lib.rs index c54bdc81a..1dc9ac22d 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -32,6 +32,9 @@ mod m20230918_182123_add_raw_name_symbol; mod m20230919_072154_cl_audits; mod m20231019_120101_add_seq_numbers_bgum_update_metadata; mod m20231206_120101_remove_was_decompressed; +mod m20240104_203133_add_cl_audits_v2; +mod m20240104_203328_remove_cl_audits; +mod m20240116_130744_add_update_metadata_ix; pub struct Migrator; @@ -71,6 +74,9 @@ impl MigratorTrait for Migrator { Box::new(m20230919_072154_cl_audits::Migration), Box::new(m20231019_120101_add_seq_numbers_bgum_update_metadata::Migration), Box::new(m20231206_120101_remove_was_decompressed::Migration), + Box::new(m20240104_203133_add_cl_audits_v2::Migration), + Box::new(m20240104_203328_remove_cl_audits::Migration), + Box::new(m20240116_130744_add_update_metadata_ix::Migration), ] } } diff --git a/migration/src/m20230919_072154_cl_audits.rs b/migration/src/m20230919_072154_cl_audits.rs index a2a75e338..172c290d6 100644 --- a/migration/src/m20230919_072154_cl_audits.rs +++ b/migration/src/m20230919_072154_cl_audits.rs @@ -45,7 +45,7 @@ impl MigrationTrait for Migration { /// Learn more at https://docs.rs/sea-query#iden #[derive(Iden)] -enum ClAudits { +pub enum ClAudits { Table, Id, Tree, diff --git a/migration/src/m20240104_203133_add_cl_audits_v2.rs b/migration/src/m20240104_203133_add_cl_audits_v2.rs new file mode 100644 index 000000000..ed1561ff2 --- /dev/null +++ b/migration/src/m20240104_203133_add_cl_audits_v2.rs @@ -0,0 +1,109 @@ +use enum_iterator::{all, Sequence}; +use sea_orm_migration::prelude::*; +use sea_orm_migration::sea_orm::{ConnectionTrait, DatabaseBackend, Statement}; +use sea_orm_migration::sea_query::extension::postgres::Type; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_type( + Type::create() + .as_enum(ClAuditsV2::Instruction) + .values(all::().map(|e| e).collect::>()) + .to_owned(), + ) + .await?; + manager + .create_table( + Table::create() + .table(ClAuditsV2::Table) + .if_not_exists() + .col( + ColumnDef::new(ClAuditsV2::Id) + .big_integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col(ColumnDef::new(ClAuditsV2::Tree).binary().not_null()) + .col(ColumnDef::new(ClAuditsV2::LeafIdx).big_integer().not_null()) + .col(ColumnDef::new(ClAuditsV2::Seq).big_integer().not_null()) + .col( + ColumnDef::new(ClAuditsV2::CreatedAt) + .date_time() + .default(SimpleExpr::Keyword(Keyword::CurrentTimestamp)) + .not_null(), + ) + .col(ColumnDef::new(ClAuditsV2::Tx).binary().not_null()) + .col( + ColumnDef::new(ClAuditsV2::Instruction) + .enumeration( + ClAuditsV2::Instruction, + all::().map(|e| e).collect::>(), + ) + .not_null(), + ) + .to_owned(), + ) + .await?; + + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + "ALTER TABLE cl_audits_v2 ADD CONSTRAINT unique_tree_leafidx_seq UNIQUE (tree, leaf_idx, seq);".to_string(), + )) + .await?; + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + "ALTER TABLE cl_audits DROP CONSTRAINT unique_tree_leafidx_seq_tx;".to_string(), + )) + .await?; + manager + .drop_table(Table::drop().table(ClAuditsV2::Table).to_owned()) + .await?; + Ok(()) + } +} + +/// Learn more at https://docs.rs/sea-query#iden +#[derive(Iden)] +pub enum ClAuditsV2 { + Table, + Id, + Tree, + LeafIdx, + Seq, + CreatedAt, + Tx, + Instruction, +} + +#[derive(Iden, Debug, PartialEq, Sequence)] +enum BubblegumInstruction { + Unknown, + MintV1, + Redeem, + CancelRedeem, + Transfer, + Delegate, + DecompressV1, + Compress, + Burn, + VerifyCreator, + UnverifyCreator, + VerifyCollection, + UnverifyCollection, + SetAndVerifyCollection, + MintToCollectionV1, +} diff --git a/migration/src/m20240104_203328_remove_cl_audits.rs b/migration/src/m20240104_203328_remove_cl_audits.rs new file mode 100644 index 000000000..c0e115edb --- /dev/null +++ b/migration/src/m20240104_203328_remove_cl_audits.rs @@ -0,0 +1,57 @@ +use sea_orm::Statement; +use sea_orm_migration::prelude::*; +use sea_orm_migration::sea_orm::{ConnectionTrait, DatabaseBackend}; + +use crate::m20230919_072154_cl_audits::ClAudits; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + DROP TABLE IF EXISTS cl_audits; + " + .to_string(), + )) + .await?; + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(ClAudits::Table) + .if_not_exists() + .col( + ColumnDef::new(ClAudits::Id) + .big_integer() + .not_null() + .auto_increment(), + ) + .col(ColumnDef::new(ClAudits::Tree).binary().not_null()) + .col(ColumnDef::new(ClAudits::NodeIdx).big_integer().not_null()) + .col(ColumnDef::new(ClAudits::LeafIdx).big_integer()) + .col(ColumnDef::new(ClAudits::Seq).big_integer().not_null()) + .col(ColumnDef::new(ClAudits::Level).big_integer().not_null()) + .col(ColumnDef::new(ClAudits::Hash).binary().not_null()) + .col( + ColumnDef::new(ClAudits::CreatedAt) + .date_time() + .default(SimpleExpr::Keyword(Keyword::CurrentTimestamp)) + .not_null(), + ) + .col(ColumnDef::new(ClAudits::Tx).string().not_null()) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/migration/src/m20240116_130744_add_update_metadata_ix.rs b/migration/src/m20240116_130744_add_update_metadata_ix.rs new file mode 100644 index 000000000..8f5a3bcef --- /dev/null +++ b/migration/src/m20240116_130744_add_update_metadata_ix.rs @@ -0,0 +1,24 @@ +use crate::m20240104_203133_add_cl_audits_v2::ClAuditsV2; +use sea_orm_migration::{prelude::*, sea_query::extension::postgres::Type}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_type( + Type::alter() + .name(ClAuditsV2::Instruction) + .add_value(Alias::new("update_metadata")) + .to_owned(), + ) + .await + } + + async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> { + // cannot rollback altering a type + Ok(()) + } +} diff --git a/nft_ingester/src/program_transformers/bubblegum/burn.rs b/nft_ingester/src/program_transformers/bubblegum/burn.rs index ef6c1dc7c..11d4e0500 100644 --- a/nft_ingester/src/program_transformers/bubblegum/burn.rs +++ b/nft_ingester/src/program_transformers/bubblegum/burn.rs @@ -17,13 +17,15 @@ pub async fn burn<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + instruction: &str, cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let Some(cl) = &parsing_result.tree_update { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; let leaf_index = cl.index; let (asset_id, _) = Pubkey::find_program_address( &[ diff --git a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs index d1a91ae45..8491163b2 100644 --- a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs @@ -15,13 +15,15 @@ pub async fn cancel_redeem<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + instruction: &str, cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; match le.schema { LeafSchema::V1 { id, diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs index c0397687a..c35be49c2 100644 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs @@ -13,6 +13,7 @@ pub async fn process<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + instruction: &str, cl_audits: bool, ) -> Result<(), IngesterError> where @@ -37,7 +38,8 @@ where "Handling collection verification event for {} (verify: {}): {}", collection, verify, bundle.txn_id ); - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; let id_bytes = match le.schema { LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(), }; diff --git a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs index d144ce17c..2a254a2b8 100644 --- a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs @@ -17,6 +17,7 @@ pub async fn process<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + instruction: &str, cl_audits: bool, ) -> Result<(), IngesterError> where @@ -57,7 +58,8 @@ where "Handling creator verification event for creator {} (verify: {}): {}", creator, verify, bundle.txn_id ); - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; match le.schema { LeafSchema::V1 { diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index 61c7ab600..d9b6b23b9 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -1,13 +1,13 @@ use crate::error::IngesterError; use digital_asset_types::dao::{ - asset, asset_authority, asset_creators, asset_data, asset_grouping, backfill_items, cl_audits, - cl_items, + asset, asset_authority, asset_creators, asset_data, asset_grouping, backfill_items, + cl_audits_v2, cl_items, sea_orm_active_enums::{ - ChainMutability, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass, - SpecificationVersions, + ChainMutability, Instruction, Mutability, OwnerType, RoyaltyTargetType, + SpecificationAssetClass, SpecificationVersions, }, }; -use log::{debug, info}; +use log::{debug, error, info}; use mpl_bubblegum::types::{Collection, Creator}; use sea_orm::{ query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait, @@ -20,12 +20,13 @@ pub async fn save_changelog_event<'c, T>( slot: u64, txn_id: &str, txn: &T, + instruction: &str, cl_audits: bool, ) -> Result where T: ConnectionTrait + TransactionTrait, { - insert_change_log(change_log_event, slot, txn_id, txn, cl_audits).await?; + insert_change_log(change_log_event, slot, txn_id, txn, instruction, cl_audits).await?; Ok(change_log_event.seq) } @@ -38,6 +39,7 @@ pub async fn insert_change_log<'c, T>( slot: u64, txn_id: &str, txn: &T, + instruction: &str, cl_audits: bool, ) -> Result<(), IngesterError> where @@ -49,12 +51,13 @@ where for p in change_log_event.path.iter() { let node_idx = p.index as i64; debug!( - "seq {}, index {} level {}, node {:?}, txn: {:?}", + "seq {}, index {} level {}, node {:?}, txn: {:?}, instruction {}", change_log_event.seq, p.index, i, bs58::encode(p.node).into_string(), txn_id, + instruction ); let leaf_idx = if i == 0 { Some(node_idx_to_leaf_idx(node_idx, depth as u32)) @@ -72,14 +75,6 @@ where ..Default::default() }; - let audit_item: Option = if cl_audits { - let mut ai: cl_audits::ActiveModel = item.clone().into(); - ai.tx = Set(txn_id.to_string()); - Some(ai) - } else { - None - }; - i += 1; let mut query = cl_items::Entity::insert(item) .on_conflict( @@ -97,10 +92,41 @@ where txn.execute(query) .await .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + } - // Insert the audit item after the insert into cl_items have been completed - if let Some(audit_item) = audit_item { - cl_audits::Entity::insert(audit_item).exec(txn).await?; + // Insert the audit item after the insert into cl_items have been completed + if cl_audits { + let tx_id_bytes = bs58::decode(txn_id) + .into_vec() + .map_err(|_e| IngesterError::ChangeLogEventMalformed)?; + let ix = Instruction::from_str(instruction); + if ix == Instruction::Unknown { + error!("Unknown instruction: {}", instruction); + } + let audit_item_v2 = cl_audits_v2::ActiveModel { + tree: Set(tree_id.to_vec()), + leaf_idx: Set(change_log_event.index as i64), + seq: Set(change_log_event.seq as i64), + tx: Set(tx_id_bytes), + instruction: Set(ix), + ..Default::default() + }; + let query = cl_audits_v2::Entity::insert(audit_item_v2) + .on_conflict( + OnConflict::columns([ + cl_audits_v2::Column::Tree, + cl_audits_v2::Column::LeafIdx, + cl_audits_v2::Column::Seq, + ]) + .do_nothing() + .to_owned(), + ) + .build(DbBackend::Postgres); + match txn.execute(query).await { + Ok(_) => {} + Err(e) => { + error!("Error while inserting into cl_audits_v2: {:?}", e); + } } } @@ -136,7 +162,6 @@ where } Ok(()) - //TODO -> set maximum size of path and break into multiple statements } #[allow(clippy::too_many_arguments)] diff --git a/nft_ingester/src/program_transformers/bubblegum/delegate.rs b/nft_ingester/src/program_transformers/bubblegum/delegate.rs index 4cb4aadc2..8df0de3d8 100644 --- a/nft_ingester/src/program_transformers/bubblegum/delegate.rs +++ b/nft_ingester/src/program_transformers/bubblegum/delegate.rs @@ -15,13 +15,15 @@ pub async fn delegate<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + instruction: &str, cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; match le.schema { LeafSchema::V1 { id, diff --git a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs index 05236aa2e..25a69d472 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs @@ -29,6 +29,7 @@ pub async fn mint_v1<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + instruction: &str, cl_audits: bool, ) -> Result, IngesterError> where @@ -47,7 +48,8 @@ where &parsing_result.tree_update, &parsing_result.payload, ) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; let metadata = args; #[allow(unreachable_patterns)] return match le.schema { diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs index ea8ef00a0..4a4484f12 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mod.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mod.rs @@ -60,42 +60,44 @@ where match ix_type { InstructionName::Transfer => { - transfer::transfer(parsing_result, bundle, txn, cl_audits).await?; + transfer::transfer(parsing_result, bundle, txn, ix_str, cl_audits).await?; } InstructionName::Burn => { - burn::burn(parsing_result, bundle, txn, cl_audits).await?; + burn::burn(parsing_result, bundle, txn, ix_str, cl_audits).await?; } InstructionName::Delegate => { - delegate::delegate(parsing_result, bundle, txn, cl_audits).await?; + delegate::delegate(parsing_result, bundle, txn, ix_str, cl_audits).await?; } InstructionName::MintV1 | InstructionName::MintToCollectionV1 => { - let task = mint_v1::mint_v1(parsing_result, bundle, txn, cl_audits).await?; + let task = mint_v1::mint_v1(parsing_result, bundle, txn, ix_str, cl_audits).await?; if let Some(t) = task { task_manager.send(t)?; } } InstructionName::Redeem => { - redeem::redeem(parsing_result, bundle, txn, cl_audits).await?; + redeem::redeem(parsing_result, bundle, txn, ix_str, cl_audits).await?; } InstructionName::CancelRedeem => { - cancel_redeem::cancel_redeem(parsing_result, bundle, txn, cl_audits).await?; + cancel_redeem::cancel_redeem(parsing_result, bundle, txn, ix_str, cl_audits).await?; } InstructionName::DecompressV1 => { debug!("No action necessary for decompression") } InstructionName::VerifyCreator | InstructionName::UnverifyCreator => { - creator_verification::process(parsing_result, bundle, txn, cl_audits).await?; + creator_verification::process(parsing_result, bundle, txn, ix_str, cl_audits).await?; } InstructionName::VerifyCollection | InstructionName::UnverifyCollection | InstructionName::SetAndVerifyCollection => { - collection_verification::process(parsing_result, bundle, txn, cl_audits).await?; + collection_verification::process(parsing_result, bundle, txn, ix_str, cl_audits) + .await?; } InstructionName::SetDecompressibleState => (), // Nothing to index. InstructionName::UpdateMetadata => { let task = - update_metadata::update_metadata(parsing_result, bundle, txn, cl_audits).await?; + update_metadata::update_metadata(parsing_result, bundle, txn, ix_str, cl_audits) + .await?; if let Some(t) = task { task_manager.send(t)?; diff --git a/nft_ingester/src/program_transformers/bubblegum/redeem.rs b/nft_ingester/src/program_transformers/bubblegum/redeem.rs index 484f5cdc9..8d1944412 100644 --- a/nft_ingester/src/program_transformers/bubblegum/redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/redeem.rs @@ -14,13 +14,15 @@ pub async fn redeem<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + instruction: &str, cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let Some(cl) = &parsing_result.tree_update { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; let leaf_index = cl.index; let (asset_id, _) = Pubkey::find_program_address( &[ diff --git a/nft_ingester/src/program_transformers/bubblegum/transfer.rs b/nft_ingester/src/program_transformers/bubblegum/transfer.rs index 230167991..42351df2b 100644 --- a/nft_ingester/src/program_transformers/bubblegum/transfer.rs +++ b/nft_ingester/src/program_transformers/bubblegum/transfer.rs @@ -16,14 +16,15 @@ pub async fn transfer<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + instruction: &str, cl_audits: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; match le.schema { LeafSchema::V1 { id, diff --git a/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs index 2f1823303..54f1b87e0 100644 --- a/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs +++ b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs @@ -27,6 +27,7 @@ pub async fn update_metadata<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + instruction: &str, cl_audits: bool, ) -> Result, IngesterError> where @@ -45,7 +46,8 @@ where &parsing_result.tree_update, &parsing_result.payload, ) { - let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; + let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction, cl_audits) + .await?; #[allow(unreachable_patterns)] return match le.schema {