From 8f82f424c799466810590e46a36e4cd677ad6418 Mon Sep 17 00:00:00 2001 From: Michael Danenberg <56533526+danenbm@users.noreply.github.com> Date: Wed, 26 Jul 2023 16:43:58 -0700 Subject: [PATCH 1/2] Enable out-of-order transaction processing for `asset` table (#77) * Fix docker preparation script to build SPL * Update owner and delegate in asset table when collection or creator verification occurs * Modify program transformers to upsert in asset table * This allows out-of-order Bubblegum transactions to create and update the asset table. * Upsert leaf schema, owner, delegate, and seq separately since those are updated by all instructions and gated by sequence number to ensure freshest value. * Mint, burn, and decompress happen without regard to sequence number because they operate on unique fields. * Mint and burn have been updated but Decompress still needs to be fixed to handle out of order transactions. * Also remove unused 'filling' variable. * Update mint and decompress to be able to upsert asset info out of order * Add second sequence number for compression status fields * Reduce logging in docker * Comment out compressed_seq before regenerating Sea ORM objects * Add migration for asset specification * Update README * Rename PNFT and regenerate Sea ORM types * Apply usage of compressed_seq after regenerating Sea ORM types * Add owner delegate sequence number for owner and delegate fields. Also remove not null constraints for asset fields without defaults. * Regenerating database types * Update handling for non null constrained asset table * Update tests to use new Sea ORM types * Use owner_and_delegate_seq to separate upserts Also update redeem and decompress to not use leaf schema events. * Adding was_decompressed flag to replace compressed_seq compressed_seq won't work because decompression doesn't create a cl_event. * Regenerating Sea ORM types * Update code to use was_decompressed flag * Fix new boolean SQL conditions * Update comment * Remove column updates in asset table during mint for items not in model * Clippy fixes in ingester main * Cleanup debug comment * Allow for sequence number to be NULL (needed after decompress now) * Add leaf specific sequence number to protect that field in asset table * Revert "Allow for sequence number to be NULL (needed after decompress now)" This reverts commit 2713a18ad2ddfac1944a9585774c45706933bf89. * Update nft_ingester/src/program_transformers/bubblegum/redeem.rs Co-authored-by: Nicolas Pennie --------- Co-authored-by: Nicolas Pennie --- README.md | 4 + .../src/dao/generated/asset.rs | 29 ++- .../src/dao/generated/sea_orm_active_enums.rs | 120 ++++++------ digital_asset_types/src/dapi/common/asset.rs | 44 +++-- digital_asset_types/tests/common.rs | 17 +- digital_asset_types/tests/get_asset_by_id.rs | 4 +- .../tests/get_assets_by_authority.rs | 12 +- .../tests/get_assets_by_creator.rs | 12 +- .../tests/get_assets_by_group.rs | 16 +- .../tests/get_assets_by_owner.rs | 12 +- digital_asset_types/tests/json_parsing.rs | 1 - docker-compose.yaml | 2 +- migration/src/lib.rs | 10 + ...0230224_093722_performance_improvements.rs | 2 +- ...0101_add_owner_delegate_sequence_number.rs | 36 ++++ .../src/m20230601_120101_add_pnft_enum_val.rs | 29 +++ ...15_120101_remove_asset_null_constraints.rs | 50 +++++ .../m20230620_120101_add_was_decompressed.rs | 41 ++++ ...0230623_120101_add_leaf_sequence_number.rs | 36 ++++ nft_ingester/src/main.rs | 38 ++-- .../program_transformers/bubblegum/burn.rs | 48 +++-- .../bubblegum/cancel_redeem.rs | 46 +++-- .../bubblegum/collection_verification.rs | 61 ++++-- .../bubblegum/creator_verification.rs | 54 ++++-- .../src/program_transformers/bubblegum/db.rs | 180 ++++++++++++++---- .../bubblegum/decompress.rs | 52 ++--- .../bubblegum/delegate.rs | 46 +++-- .../program_transformers/bubblegum/mint_v1.rs | 89 +++++++-- .../src/program_transformers/bubblegum/mod.rs | 8 + .../program_transformers/bubblegum/redeem.rs | 69 ++++--- .../bubblegum/transfer.rs | 49 +++-- .../token_metadata/master_edition.rs | 2 +- .../token_metadata/v1_asset.rs | 12 +- prepare-local-docker-env.sh | 5 - 34 files changed, 866 insertions(+), 370 deletions(-) create mode 100644 migration/src/m20230526_120101_add_owner_delegate_sequence_number.rs create mode 100644 migration/src/m20230601_120101_add_pnft_enum_val.rs create mode 100644 migration/src/m20230615_120101_remove_asset_null_constraints.rs create mode 100644 migration/src/m20230620_120101_add_was_decompressed.rs create mode 100644 migration/src/m20230623_120101_add_leaf_sequence_number.rs diff --git a/README.md b/README.md index 6d48d019e..2784bcad9 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,10 @@ Then with a local `DATABASE_URL` var exported like this `export DATABASE_URL=pos If you need to install `sea-orm-cli` run `cargo install sea-orm-cli`. +Note: The current SeaORM types were generated using version 0.9.3 so unless you want to upgrade you can install using `cargo install sea-orm-cli --version 0.9.3`. + +Also note: The migration `m20230224_093722_performance_improvements` needs to be commented out of the migration lib.rs in order for the Sea ORM `Relations` to generate correctly. + #### Developing Locally *Prerequisites* * A Postgres Server running with the database setup according to ./init.sql diff --git a/digital_asset_types/src/dao/generated/asset.rs b/digital_asset_types/src/dao/generated/asset.rs index 0afb21fab..642ccf345 100644 --- a/digital_asset_types/src/dao/generated/asset.rs +++ b/digital_asset_types/src/dao/generated/asset.rs @@ -20,8 +20,8 @@ impl EntityName for Entity { pub struct Model { pub id: Vec, pub alt_id: Option>, - pub specification_version: SpecificationVersions, - pub specification_asset_class: SpecificationAssetClass, + pub specification_version: Option, + pub specification_asset_class: Option, pub owner: Option>, pub owner_type: OwnerType, pub delegate: Option>, @@ -30,19 +30,22 @@ pub struct Model { pub supply_mint: Option>, pub compressed: bool, pub compressible: bool, - pub seq: i64, + pub seq: Option, pub tree_id: Option>, pub leaf: Option>, - pub nonce: i64, + pub nonce: Option, pub royalty_target_type: RoyaltyTargetType, pub royalty_target: Option>, pub royalty_amount: i32, pub asset_data: Option>, pub created_at: Option, pub burnt: bool, - pub slot_updated: i64, + pub slot_updated: Option, pub data_hash: Option, pub creator_hash: Option, + pub owner_delegate_seq: Option, + pub was_decompressed: bool, + pub leaf_seq: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -72,6 +75,9 @@ pub enum Column { SlotUpdated, DataHash, CreatorHash, + OwnerDelegateSeq, + WasDecompressed, + LeafSeq, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -101,8 +107,8 @@ impl ColumnTrait for Column { match self { Self::Id => ColumnType::Binary.def(), Self::AltId => ColumnType::Binary.def().null(), - Self::SpecificationVersion => SpecificationVersions::db_type(), - Self::SpecificationAssetClass => SpecificationAssetClass::db_type(), + Self::SpecificationVersion => SpecificationVersions::db_type().null(), + Self::SpecificationAssetClass => SpecificationAssetClass::db_type().null(), Self::Owner => ColumnType::Binary.def().null(), Self::OwnerType => OwnerType::db_type(), Self::Delegate => ColumnType::Binary.def().null(), @@ -111,19 +117,22 @@ impl ColumnTrait for Column { Self::SupplyMint => ColumnType::Binary.def().null(), Self::Compressed => ColumnType::Boolean.def(), Self::Compressible => ColumnType::Boolean.def(), - Self::Seq => ColumnType::BigInteger.def(), + Self::Seq => ColumnType::BigInteger.def().null(), Self::TreeId => ColumnType::Binary.def().null(), Self::Leaf => ColumnType::Binary.def().null(), - Self::Nonce => ColumnType::BigInteger.def(), + Self::Nonce => ColumnType::BigInteger.def().null(), Self::RoyaltyTargetType => RoyaltyTargetType::db_type(), Self::RoyaltyTarget => ColumnType::Binary.def().null(), Self::RoyaltyAmount => ColumnType::Integer.def(), Self::AssetData => ColumnType::Binary.def().null(), Self::CreatedAt => ColumnType::TimestampWithTimeZone.def().null(), Self::Burnt => ColumnType::Boolean.def(), - Self::SlotUpdated => ColumnType::BigInteger.def(), + Self::SlotUpdated => ColumnType::BigInteger.def().null(), Self::DataHash => ColumnType::Char(Some(50u32)).def().null(), Self::CreatorHash => ColumnType::Char(Some(50u32)).def().null(), + Self::OwnerDelegateSeq => ColumnType::BigInteger.def().null(), + Self::WasDecompressed => ColumnType::Boolean.def(), + Self::LeafSeq => ColumnType::BigInteger.def().null(), } } } diff --git a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs index 48fb34b5e..5fb406acc 100644 --- a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs +++ b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs @@ -4,52 +4,8 @@ use sea_orm::entity::prelude::*; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm( - rs_type = "String", - db_type = "Enum", - enum_name = "specification_versions" -)] -pub enum SpecificationVersions { - #[sea_orm(string_value = "unknown")] - Unknown, - #[sea_orm(string_value = "v0")] - V0, - #[sea_orm(string_value = "v1")] - V1, - #[sea_orm(string_value = "v2")] - V2, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm( - rs_type = "String", - db_type = "Enum", - enum_name = "royalty_target_type" -)] -pub enum RoyaltyTargetType { - #[sea_orm(string_value = "creators")] - Creators, - #[sea_orm(string_value = "fanout")] - Fanout, - #[sea_orm(string_value = "single")] - Single, - #[sea_orm(string_value = "unknown")] - Unknown, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")] -pub enum TaskStatus { - #[sea_orm(string_value = "failed")] - Failed, - #[sea_orm(string_value = "pending")] - Pending, - #[sea_orm(string_value = "running")] - Running, - #[sea_orm(string_value = "success")] - Success, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "chain_mutability")] -pub enum ChainMutability { +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "mutability")] +pub enum Mutability { #[sea_orm(string_value = "immutable")] Immutable, #[sea_orm(string_value = "mutable")] @@ -58,18 +14,8 @@ pub enum ChainMutability { Unknown, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "owner_type")] -pub enum OwnerType { - #[sea_orm(string_value = "single")] - Single, - #[sea_orm(string_value = "token")] - Token, - #[sea_orm(string_value = "unknown")] - Unknown, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "mutability")] -pub enum Mutability { +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "chain_mutability")] +pub enum ChainMutability { #[sea_orm(string_value = "immutable")] Immutable, #[sea_orm(string_value = "mutable")] @@ -96,6 +42,18 @@ pub enum V1AccountAttachments { Unknown, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "task_status")] +pub enum TaskStatus { + #[sea_orm(string_value = "failed")] + Failed, + #[sea_orm(string_value = "pending")] + Pending, + #[sea_orm(string_value = "running")] + Running, + #[sea_orm(string_value = "success")] + Success, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm( rs_type = "String", db_type = "Enum", @@ -116,10 +74,52 @@ pub enum SpecificationAssetClass { Print, #[sea_orm(string_value = "PRINTABLE_NFT")] PrintableNft, + #[sea_orm(string_value = "PROGRAMMABLE_NFT")] + ProgrammableNft, #[sea_orm(string_value = "TRANSFER_RESTRICTED_NFT")] TransferRestrictedNft, #[sea_orm(string_value = "unknown")] Unknown, - #[sea_orm(string_value = "PNFT")] - ProgrammableNft, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm( + rs_type = "String", + db_type = "Enum", + enum_name = "specification_versions" +)] +pub enum SpecificationVersions { + #[sea_orm(string_value = "unknown")] + Unknown, + #[sea_orm(string_value = "v0")] + V0, + #[sea_orm(string_value = "v1")] + V1, + #[sea_orm(string_value = "v2")] + V2, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm( + rs_type = "String", + db_type = "Enum", + enum_name = "royalty_target_type" +)] +pub enum RoyaltyTargetType { + #[sea_orm(string_value = "creators")] + Creators, + #[sea_orm(string_value = "fanout")] + Fanout, + #[sea_orm(string_value = "single")] + Single, + #[sea_orm(string_value = "unknown")] + Unknown, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "owner_type")] +pub enum OwnerType { + #[sea_orm(string_value = "single")] + Single, + #[sea_orm(string_value = "token")] + Token, + #[sea_orm(string_value = "unknown")] + Unknown, } diff --git a/digital_asset_types/src/dapi/common/asset.rs b/digital_asset_types/src/dapi/common/asset.rs index d2251d868..917c2f105 100644 --- a/digital_asset_types/src/dapi/common/asset.rs +++ b/digital_asset_types/src/dapi/common/asset.rs @@ -1,7 +1,7 @@ use crate::dao::sea_orm_active_enums::SpecificationVersions; +use crate::dao::FullAsset; use crate::dao::Pagination; use crate::dao::{asset, asset_authority, asset_creators, asset_data, asset_grouping}; -use crate::dao::FullAsset; use crate::rpc::filter::{AssetSortBy, AssetSortDirection, AssetSorting}; use crate::rpc::response::{AssetError, AssetList}; @@ -221,9 +221,11 @@ pub fn v1_content_from_json(asset_data: &asset_data::Model) -> Result Result { match asset.specification_version { - SpecificationVersions::V1 => v1_content_from_json(data), - SpecificationVersions::V0 => v1_content_from_json(data), - _ => Err(DbErr::Custom("Version Not Implemented".to_string())), + Some(SpecificationVersions::V1) | Some(SpecificationVersions::V0) => { + v1_content_from_json(data) + } + Some(_) => Err(DbErr::Custom("Version Not Implemented".to_string())), + None => Err(DbErr::Custom("Specification version not found".to_string())), } } @@ -258,11 +260,19 @@ pub fn to_grouping(groups: Vec) -> Vec { .collect() } -pub fn get_interface(asset: &asset::Model) -> Interface { - Interface::from(( - &asset.specification_version, - &asset.specification_asset_class, - )) +pub fn get_interface(asset: &asset::Model) -> Result { + Ok(Interface::from(( + asset + .specification_version + .as_ref() + .ok_or(DbErr::Custom("Specification version not found".to_string()))?, + asset + .specification_asset_class + .as_ref() + .ok_or(DbErr::Custom( + "Specification asset class not found".to_string(), + ))?, + ))) } //TODO -> impl custom erro type @@ -277,15 +287,15 @@ pub fn asset_to_rpc(asset: FullAsset) -> Result { let rpc_authorities = to_authority(authorities); let rpc_creators = to_creators(creators); let rpc_groups = to_grouping(groups); - let interface = get_interface(&asset); + let interface = get_interface(&asset)?; let content = get_content(&asset, &data)?; let mut chain_data_selector_fn = jsonpath_lib::selector(&data.chain_data); let chain_data_selector = &mut chain_data_selector_fn; let basis_points = safe_select(chain_data_selector, "$.primary_sale_happened") .and_then(|v| v.as_bool()) .unwrap_or(false); - let edition_nonce = safe_select(chain_data_selector, "$.edition_nonce") - .and_then(|v| v.as_u64()); + let edition_nonce = + safe_select(chain_data_selector, "$.edition_nonce").and_then(|v| v.as_u64()); Ok(RpcAsset { interface: interface.clone(), @@ -296,8 +306,12 @@ pub fn asset_to_rpc(asset: FullAsset) -> Result { compression: Some(Compression { eligible: asset.compressible, compressed: asset.compressed, - leaf_id: asset.nonce, - seq: asset.seq, + leaf_id: asset + .nonce + .ok_or(DbErr::Custom("Nonce not found".to_string()))?, + seq: asset + .seq + .ok_or(DbErr::Custom("Seq not found".to_string()))?, tree: asset .tree_id .map(|s| bs58::encode(s).into_string()) @@ -337,7 +351,7 @@ pub fn asset_to_rpc(asset: FullAsset) -> Result { }, supply: match interface { Interface::V1NFT => Some(Supply { - edition_nonce: edition_nonce, + edition_nonce, print_current_supply: 0, print_max_supply: 0, }), diff --git a/digital_asset_types/tests/common.rs b/digital_asset_types/tests/common.rs index a052fe277..9bb28a006 100644 --- a/digital_asset_types/tests/common.rs +++ b/digital_asset_types/tests/common.rs @@ -86,6 +86,7 @@ pub fn create_asset_data( ) } +#[allow(clippy::too_many_arguments)] pub fn create_asset( id: Vec, owner: Vec, @@ -97,8 +98,8 @@ pub fn create_asset( compressed: bool, compressible: bool, tree_id: Option>, - specification_version: SpecificationVersions, - nonce: i64, + specification_version: Option, + nonce: Option, leaf: Option>, royalty_target_type: RoyaltyTargetType, royalty_target: Option>, @@ -134,7 +135,7 @@ pub fn create_asset( supply_mint, compressed, compressible, - seq: 0, + seq: Some(0), tree_id, specification_version, nonce, @@ -142,14 +143,17 @@ pub fn create_asset( royalty_target_type, royalty_target, royalty_amount, - asset_data: Some(id.clone()), + asset_data: Some(id), burnt: false, created_at: None, - specification_asset_class: SpecificationAssetClass::Nft, - slot_updated: 0, + specification_asset_class: Some(SpecificationAssetClass::Nft), + slot_updated: Some(0), data_hash: None, alt_id: None, creator_hash: None, + owner_delegate_seq: Some(0), + was_decompressed: false, + leaf_seq: Some(0), }, ) } @@ -204,6 +208,7 @@ pub fn create_asset_authority( ) } +#[allow(dead_code)] pub fn create_asset_grouping( asset_id: Vec, collection: Pubkey, diff --git a/digital_asset_types/tests/get_asset_by_id.rs b/digital_asset_types/tests/get_asset_by_id.rs index 956dcdf1f..3a5bf14c6 100644 --- a/digital_asset_types/tests/get_asset_by_id.rs +++ b/digital_asset_types/tests/get_asset_by_id.rs @@ -50,8 +50,8 @@ async fn get_asset_by_id() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, diff --git a/digital_asset_types/tests/get_assets_by_authority.rs b/digital_asset_types/tests/get_assets_by_authority.rs index 1a94af0be..921bf7176 100644 --- a/digital_asset_types/tests/get_assets_by_authority.rs +++ b/digital_asset_types/tests/get_assets_by_authority.rs @@ -61,8 +61,8 @@ async fn get_assets_by_owner() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, @@ -114,8 +114,8 @@ async fn get_assets_by_owner() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, @@ -174,8 +174,8 @@ async fn get_assets_by_owner() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, diff --git a/digital_asset_types/tests/get_assets_by_creator.rs b/digital_asset_types/tests/get_assets_by_creator.rs index 0e87e9da9..c282be6ad 100644 --- a/digital_asset_types/tests/get_assets_by_creator.rs +++ b/digital_asset_types/tests/get_assets_by_creator.rs @@ -64,8 +64,8 @@ async fn get_assets_by_creator() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, @@ -117,8 +117,8 @@ async fn get_assets_by_creator() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, @@ -177,8 +177,8 @@ async fn get_assets_by_creator() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, diff --git a/digital_asset_types/tests/get_assets_by_group.rs b/digital_asset_types/tests/get_assets_by_group.rs index 294dc96f1..7788a1041 100644 --- a/digital_asset_types/tests/get_assets_by_group.rs +++ b/digital_asset_types/tests/get_assets_by_group.rs @@ -66,8 +66,8 @@ async fn get_assets_by_group() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, @@ -119,8 +119,8 @@ async fn get_assets_by_group() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, @@ -141,7 +141,7 @@ async fn get_assets_by_group() -> Result<(), DbErr> { 2, ); - let asset_grouping_2 = create_asset_grouping(id_2.to_bytes().to_vec(), collection.clone(), 1); + let asset_grouping_2 = create_asset_grouping(id_2.to_bytes().to_vec(), collection, 1); let metadata_3 = MockMetadataArgs { name: String::from("Test #3"), @@ -181,8 +181,8 @@ async fn get_assets_by_group() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, @@ -211,7 +211,7 @@ async fn get_assets_by_group() -> Result<(), DbErr> { 3, ); - let asset_grouping_3 = create_asset_grouping(id_3.to_bytes().to_vec(), collection.clone(), 2); + let asset_grouping_3 = create_asset_grouping(id_3.to_bytes().to_vec(), collection, 2); let db = MockDatabase::new(DatabaseBackend::Postgres) .append_query_results(vec![vec![asset_data_1.1]]) diff --git a/digital_asset_types/tests/get_assets_by_owner.rs b/digital_asset_types/tests/get_assets_by_owner.rs index 1a94af0be..921bf7176 100644 --- a/digital_asset_types/tests/get_assets_by_owner.rs +++ b/digital_asset_types/tests/get_assets_by_owner.rs @@ -61,8 +61,8 @@ async fn get_assets_by_owner() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, @@ -114,8 +114,8 @@ async fn get_assets_by_owner() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, @@ -174,8 +174,8 @@ async fn get_assets_by_owner() -> Result<(), DbErr> { true, false, None, - SpecificationVersions::V1, - 0 as i64, + Some(SpecificationVersions::V1), + Some(0_i64), None, RoyaltyTargetType::Creators, None, diff --git a/digital_asset_types/tests/json_parsing.rs b/digital_asset_types/tests/json_parsing.rs index d72c04989..8db326a1a 100644 --- a/digital_asset_types/tests/json_parsing.rs +++ b/digital_asset_types/tests/json_parsing.rs @@ -8,7 +8,6 @@ use digital_asset_types::rpc::Content; use digital_asset_types::rpc::File; use solana_sdk::signature::Keypair; use solana_sdk::signer::Signer; -use tokio; pub async fn test_json(uri: String) -> Content { let body: serde_json::Value = reqwest::get(&uri).await.unwrap().json().await.unwrap(); diff --git a/docker-compose.yaml b/docker-compose.yaml index 21d839684..d910d8ce7 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -95,7 +95,7 @@ services: - ./ledger:/config:rw - ./solana-test-validator-geyser-config:/plugin-config:rw environment: - RUST_LOG: info + RUST_LOG: error PLUGIN_MESSENGER_CONFIG.messenger_type: "Redis" PLUGIN_MESSENGER_CONFIG.connection_config: '{redis_connection_str="redis://redis"}' ports: diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 5f39bba7f..6edefc669 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -16,6 +16,11 @@ mod m20230203_205959_improve_upsert_perf; mod m20230224_093722_performance_improvements; mod m20230310_162227_add_indexes_to_bg; mod m20230317_121944_remove_indexes_for_perf; +mod m20230526_120101_add_owner_delegate_sequence_number; +mod m20230601_120101_add_pnft_enum_val; +mod m20230615_120101_remove_asset_null_constraints; +mod m20230620_120101_add_was_decompressed; +mod m20230623_120101_add_leaf_sequence_number; pub struct Migrator; @@ -39,6 +44,11 @@ impl MigratorTrait for Migrator { Box::new(m20230224_093722_performance_improvements::Migration), Box::new(m20230310_162227_add_indexes_to_bg::Migration), Box::new(m20230317_121944_remove_indexes_for_perf::Migration), + Box::new(m20230526_120101_add_owner_delegate_sequence_number::Migration), + Box::new(m20230601_120101_add_pnft_enum_val::Migration), + Box::new(m20230615_120101_remove_asset_null_constraints::Migration), + Box::new(m20230620_120101_add_was_decompressed::Migration), + Box::new(m20230623_120101_add_leaf_sequence_number::Migration), ] } } diff --git a/migration/src/m20230224_093722_performance_improvements.rs b/migration/src/m20230224_093722_performance_improvements.rs index 980f8fba5..defa1d79e 100644 --- a/migration/src/m20230224_093722_performance_improvements.rs +++ b/migration/src/m20230224_093722_performance_improvements.rs @@ -135,7 +135,7 @@ impl MigrationTrait for Migration { Ok(()) } - async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> { Ok(()) } } diff --git a/migration/src/m20230526_120101_add_owner_delegate_sequence_number.rs b/migration/src/m20230526_120101_add_owner_delegate_sequence_number.rs new file mode 100644 index 000000000..825965141 --- /dev/null +++ b/migration/src/m20230526_120101_add_owner_delegate_sequence_number.rs @@ -0,0 +1,36 @@ +use digital_asset_types::dao::asset; +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset::Entity) + .add_column(ColumnDef::new(Alias::new("owner_delegate_seq")).big_integer()) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset::Entity) + .drop_column(Alias::new("owner_delegate_seq")) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/migration/src/m20230601_120101_add_pnft_enum_val.rs b/migration/src/m20230601_120101_add_pnft_enum_val.rs new file mode 100644 index 000000000..06fec368f --- /dev/null +++ b/migration/src/m20230601_120101_add_pnft_enum_val.rs @@ -0,0 +1,29 @@ +use sea_orm_migration::{ + prelude::*, + sea_orm::{ConnectionTrait, DatabaseBackend, Statement}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + "ALTER TYPE specification_asset_class ADD VALUE IF NOT EXISTS 'PROGRAMMABLE_NFT';" + .to_string(), + )) + .await?; + + Ok(()) + } + + async fn down(&self, _manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + Ok(()) + } +} diff --git a/migration/src/m20230615_120101_remove_asset_null_constraints.rs b/migration/src/m20230615_120101_remove_asset_null_constraints.rs new file mode 100644 index 000000000..2fa440cd9 --- /dev/null +++ b/migration/src/m20230615_120101_remove_asset_null_constraints.rs @@ -0,0 +1,50 @@ +use sea_orm_migration::{ + prelude::*, + sea_orm::{ConnectionTrait, DatabaseBackend, Statement}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + ALTER TABLE asset + ALTER COLUMN specification_version DROP NOT NULL, + ALTER COLUMN specification_asset_class DROP NOT NULL, + ALTER COLUMN seq DROP NOT NULL, + ALTER COLUMN nonce DROP NOT NULL, + ALTER COLUMN slot_updated DROP NOT NULL; + " + .to_string(), + )) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + ALTER TABLE asset + ALTER COLUMN specification_version SET NOT NULL, + ALTER COLUMN specification_asset_class SET NOT NULL, + ALTER COLUMN seq SET NOT NULL, + ALTER COLUMN nonce SET NOT NULL, + ALTER COLUMN slot_updated SET NOT NULL; + " + .to_string(), + )) + .await?; + + Ok(()) + } +} diff --git a/migration/src/m20230620_120101_add_was_decompressed.rs b/migration/src/m20230620_120101_add_was_decompressed.rs new file mode 100644 index 000000000..270b0eb6d --- /dev/null +++ b/migration/src/m20230620_120101_add_was_decompressed.rs @@ -0,0 +1,41 @@ +use digital_asset_types::dao::asset; +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset::Entity) + .add_column( + ColumnDef::new(Alias::new("was_decompressed")) + .boolean() + .not_null() + .default(false), + ) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset::Entity) + .drop_column(Alias::new("was_decompressed")) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/migration/src/m20230623_120101_add_leaf_sequence_number.rs b/migration/src/m20230623_120101_add_leaf_sequence_number.rs new file mode 100644 index 000000000..b37314c77 --- /dev/null +++ b/migration/src/m20230623_120101_add_leaf_sequence_number.rs @@ -0,0 +1,36 @@ +use digital_asset_types::dao::asset; +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset::Entity) + .add_column(ColumnDef::new(Alias::new("leaf_seq")).big_integer()) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset::Entity) + .drop_column(Alias::new("leaf_seq")) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index 18413f660..ff2ab7784 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -14,7 +14,7 @@ use crate::{ account_updates::account_worker, ack::ack_worker, backfiller::setup_backfiller, - config::{init_logger, setup_config, IngesterRole}, + config::{init_logger, rand_string, setup_config, IngesterRole}, database::setup_database, error::IngesterError, metrics::setup_metrics, @@ -22,31 +22,22 @@ use crate::{ tasks::{BgTask, DownloadMetadataTask, TaskManager}, transaction_notifications::transaction_worker, }; - -use crate::config::rand_string; use cadence_macros::{is_global_default_set, statsd_count}; use chrono::Duration; +use clap::{arg, command, value_parser}; use log::{error, info}; use plerkle_messenger::{ redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, TRANSACTION_STREAM, }; -use tokio::{ - signal, - task::{JoinSet}, -}; - -use std::{ - path::PathBuf, - time -}; -use clap::{arg, command, value_parser, ArgAction, Command}; +use std::{path::PathBuf, time}; +use tokio::{signal, task::JoinSet}; #[tokio::main(flavor = "multi_thread")] pub async fn main() -> Result<(), IngesterError> { init_logger(); info!("Starting nft_ingester"); - let matches = command!() + let matches = command!() .arg( arg!( -c --config "Sets a custom config file" @@ -84,11 +75,16 @@ pub async fn main() -> Result<(), IngesterError> { // BACKGROUND TASKS -------------------------------------------- //Setup definitions for background tasks - let task_runner_config = config.background_task_runner_config.clone().unwrap_or_default(); + let task_runner_config = config + .background_task_runner_config + .clone() + .unwrap_or_default(); let bg_task_definitions: Vec> = vec![Box::new(DownloadMetadataTask { lock_duration: task_runner_config.lock_duration, max_attempts: task_runner_config.max_attempts, - timeout: Some(time::Duration::from_secs(task_runner_config.timeout.unwrap_or(3))), + timeout: Some(time::Duration::from_secs( + task_runner_config.timeout.unwrap_or(3), + )), })]; let mut background_task_manager = @@ -107,7 +103,7 @@ pub async fn main() -> Result<(), IngesterError> { ACCOUNT_STREAM, )?; let mut timer_txn = StreamSizeTimer::new( - stream_metrics_timer.clone(), + stream_metrics_timer, config.messenger_config.clone(), TRANSACTION_STREAM, )?; @@ -121,10 +117,10 @@ pub async fn main() -> Result<(), IngesterError> { // Stream Consumers Setup ------------------------------------- if role == IngesterRole::Ingester || role == IngesterRole::All { - let (ack_task, ack_sender) = + let (_ack_task, ack_sender) = ack_worker::(config.get_messneger_client_config()); for i in 0..config.get_account_stream_worker_count() { - let account = account_worker::( + let _account = account_worker::( database_pool.clone(), config.get_messneger_client_config(), bg_task_sender.clone(), @@ -137,7 +133,7 @@ pub async fn main() -> Result<(), IngesterError> { ); } for i in 0..config.get_transaction_stream_worker_count() { - let txn = transaction_worker::( + let _txn = transaction_worker::( database_pool.clone(), config.get_messneger_client_config(), bg_task_sender.clone(), @@ -154,7 +150,7 @@ pub async fn main() -> Result<(), IngesterError> { // Setup Stream Size Timers, these are small processes that run every 60 seconds and farm metrics for the size of the streams. // If metrics are disabled, these will not run. if role == IngesterRole::BackgroundTaskRunner || role == IngesterRole::All { - let background_runner_config = config.clone().background_task_runner_config;; + let background_runner_config = config.clone().background_task_runner_config; tasks.spawn(background_task_manager.start_runner(background_runner_config)); } // Backfiller Setup ------------------------------------------ diff --git a/nft_ingester/src/program_transformers/bubblegum/burn.rs b/nft_ingester/src/program_transformers/bubblegum/burn.rs index af0b64ac4..9ec27738e 100644 --- a/nft_ingester/src/program_transformers/bubblegum/burn.rs +++ b/nft_ingester/src/program_transformers/bubblegum/burn.rs @@ -1,10 +1,17 @@ -use super::{save_changelog_event, update_asset}; -use crate::error::IngesterError; +use crate::{ + error::IngesterError, + program_transformers::bubblegum::{ + save_changelog_event, u32_to_u8_array, upsert_asset_with_seq, + }, +}; use anchor_lang::prelude::Pubkey; use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}; use digital_asset_types::dao::asset; use log::debug; -use sea_orm::{entity::*, ConnectionTrait, TransactionTrait}; +use sea_orm::{ + entity::*, query::*, sea_query::OnConflict, ConnectionTrait, DbBackend, EntityTrait, + TransactionTrait, +}; pub async fn burn<'c, T>( parsing_result: &BubblegumInstruction, @@ -26,27 +33,32 @@ where &mpl_bubblegum::ID, ); debug!("Indexing burn for asset id: {:?}", asset_id); - let id_bytes = asset_id.to_bytes().to_vec(); - let asset_to_update = asset::ActiveModel { - id: Unchanged(id_bytes.clone()), + let id_bytes = asset_id.to_bytes(); + + let asset_model = asset::ActiveModel { + id: Set(id_bytes.to_vec()), burnt: Set(true), - seq: Set(seq as i64), ..Default::default() }; - // Don't send sequence number with this update, because we will always - // run this update even if it's from a backfill/replay. - update_asset(txn, id_bytes, None, asset_to_update).await?; + + // Upsert asset table `burnt` column. + let query = asset::Entity::insert(asset_model) + .on_conflict( + OnConflict::columns([asset::Column::Id]) + .update_columns([ + asset::Column::Burnt, + //TODO maybe handle slot updated. + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + txn.execute(query).await?; + + upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + return Ok(()); } Err(IngesterError::ParsingError( "Ix not parsed correctly".to_string(), )) } - -// PDA lookup requires an 8-byte array. -fn u32_to_u8_array(value: u32) -> [u8; 8] { - let bytes: [u8; 4] = value.to_le_bytes(); - let mut result: [u8; 8] = [0; 8]; - result[..4].copy_from_slice(&bytes); - result -} diff --git a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs index a80a550b9..066e8fd1e 100644 --- a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs @@ -1,13 +1,15 @@ use crate::{ error::IngesterError, + program_transformers::bubblegum::{ + save_changelog_event, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, + }, }; -use super::{update_asset, save_changelog_event}; use blockbuster::{ instruction::InstructionBundle, programs::bubblegum::{BubblegumInstruction, LeafSchema}, }; -use digital_asset_types::dao::asset; -use sea_orm::{entity::*, ConnectionTrait, TransactionTrait}; +use sea_orm::{ConnectionTrait, TransactionTrait}; pub async fn cancel_redeem<'c, T>( parsing_result: &BubblegumInstruction, @@ -19,29 +21,43 @@ where { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { let seq = save_changelog_event(cl, bundle.slot, txn).await?; + #[allow(unreachable_patterns)] return match le.schema { LeafSchema::V1 { id, - delegate, owner, + delegate, .. } => { - let id_bytes = id.to_bytes().to_vec(); + let id_bytes = id.to_bytes(); + let owner_bytes = owner.to_bytes().to_vec(); let delegate = if owner == delegate { None } else { Some(delegate.to_bytes().to_vec()) }; - let owner_bytes = owner.to_bytes().to_vec(); - let asset_to_update = asset::ActiveModel { - id: Unchanged(id_bytes.clone()), - leaf: Set(Some(le.leaf_hash.to_vec())), - delegate: Set(delegate), - owner: Set(Some(owner_bytes)), - seq: Set(seq as i64), // gummyroll seq - ..Default::default() - }; - update_asset(txn, id_bytes, Some(seq), asset_to_update).await + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + txn, + id_bytes.to_vec(), + Some(le.leaf_hash.to_vec()), + Some(seq as i64), + false, + ) + .await?; + + // Partial update of asset table with just leaf owner and delegate. + upsert_asset_with_owner_and_delegate_info( + txn, + id_bytes.to_vec(), + owner_bytes, + delegate, + seq as i64, + ) + .await?; + + upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await } _ => Err(IngesterError::NotImplemented), }; diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs index 88b323f8e..46aae4b54 100644 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs @@ -1,14 +1,17 @@ +use crate::{ + error::IngesterError, + program_transformers::bubblegum::{ + save_changelog_event, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, + }, +}; use blockbuster::{ instruction::InstructionBundle, programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, }; -use digital_asset_types::dao::{asset, asset_grouping}; -use sea_orm::{entity::*, query::*, sea_query::OnConflict, DbBackend, Set, Unchanged}; +use digital_asset_types::dao::asset_grouping; +use sea_orm::{entity::*, query::*, sea_query::OnConflict, DbBackend, Set}; -use crate::{ - error::IngesterError, -}; -use super::{update_asset, save_changelog_event}; pub async fn process<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, @@ -22,24 +25,50 @@ where // Do we need to update the `slot_updated` field as well as part of the table // updates below? let seq = save_changelog_event(cl, bundle.slot, txn).await?; + #[allow(unreachable_patterns)] match le.schema { - LeafSchema::V1 { id, .. } => { - let id_bytes = id.to_bytes().to_vec(); - - let asset_to_update = asset::ActiveModel { - id: Unchanged(id_bytes.clone()), - leaf: Set(Some(le.leaf_hash.to_vec())), - seq: Set(seq as i64), - ..Default::default() + LeafSchema::V1 { + id, + owner, + delegate, + .. + } => { + let id_bytes = id.to_bytes(); + let owner_bytes = owner.to_bytes().to_vec(); + let delegate = if owner == delegate { + None + } else { + Some(delegate.to_bytes().to_vec()) }; - update_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?; + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + txn, + id_bytes.to_vec(), + Some(le.leaf_hash.to_vec()), + Some(seq as i64), + false, + ) + .await?; + + // Partial update of asset table with just leaf owner and delegate. + upsert_asset_with_owner_and_delegate_info( + txn, + id_bytes.to_vec(), + owner_bytes, + delegate, + seq as i64, + ) + .await?; + + upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; if verify { if let Some(Payload::SetAndVerifyCollection { collection }) = parsing_result.payload { let grouping = asset_grouping::ActiveModel { - asset_id: Set(id_bytes.clone()), + asset_id: Set(id_bytes.to_vec()), group_key: Set("collection".to_string()), group_value: Set(collection.to_string()), seq: Set(seq as i64), diff --git a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs index a549c80df..a8cc73077 100644 --- a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs @@ -2,12 +2,15 @@ use blockbuster::{ instruction::InstructionBundle, programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, }; -use digital_asset_types::dao::{asset, asset_creators}; -use sea_orm::{ConnectionTrait, Set, TransactionTrait, Unchanged}; +use digital_asset_types::dao::asset_creators; +use sea_orm::{ConnectionTrait, Set, TransactionTrait}; use crate::{ - program_transformers::bubblegum::{update_asset, update_creator}, error::IngesterError, + program_transformers::bubblegum::{ + update_creator, upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, + upsert_asset_with_seq, + }, }; use super::save_changelog_event; @@ -36,18 +39,45 @@ where // updates below? let seq = save_changelog_event(cl, bundle.slot, txn).await?; + #[allow(unreachable_patterns)] let asset_id_bytes = match le.schema { - LeafSchema::V1 { id, .. } => { - let id_bytes = id.to_bytes().to_vec(); - let asset_to_update = asset::ActiveModel { - id: Unchanged(id_bytes.clone()), - leaf: Set(Some(le.leaf_hash.to_vec())), - seq: Set(seq as i64), - ..Default::default() + LeafSchema::V1 { + id, + owner, + delegate, + .. + } => { + let id_bytes = id.to_bytes(); + let owner_bytes = owner.to_bytes().to_vec(); + let delegate = if owner == delegate { + None + } else { + Some(delegate.to_bytes().to_vec()) }; - update_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?; - id_bytes + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + txn, + id_bytes.to_vec(), + Some(le.leaf_hash.to_vec()), + Some(seq as i64), + false, + ) + .await?; + + // Partial update of asset table with just leaf owner and delegate. + upsert_asset_with_owner_and_delegate_info( + txn, + id_bytes.to_vec(), + owner_bytes, + delegate, + seq as i64, + ) + .await?; + + upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + + id_bytes.to_vec() } _ => return Err(IngesterError::NotImplemented), }; diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index 86362f2f6..1b5975f23 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -1,8 +1,8 @@ use crate::error::IngesterError; use digital_asset_types::dao::{asset, asset_creators, backfill_items, cl_items}; -use log::{debug, info, warn}; +use log::{debug, info}; use sea_orm::{ - entity::*, query::*, sea_query::OnConflict, ColumnTrait, DbBackend, DbErr, EntityTrait, + query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait, }; use spl_account_compression::events::ChangeLogEventV1; @@ -14,7 +14,7 @@ pub async fn save_changelog_event<'c, T>( where T: ConnectionTrait + TransactionTrait, { - insert_change_log(change_log_event, slot, txn, false).await?; + insert_change_log(change_log_event, slot, txn).await?; Ok(change_log_event.seq) } @@ -26,7 +26,6 @@ pub async fn insert_change_log<'c, T>( change_log_event: &ChangeLogEventV1, slot: u64, txn: &T, - filling: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, @@ -71,9 +70,7 @@ where .to_owned(), ) .build(DbBackend::Postgres); - if !filling { - query.sql = format!("{} WHERE excluded.seq > cl_items.seq", query.sql); - } + query.sql = format!("{} WHERE excluded.seq > cl_items.seq", query.sql); txn.execute(query) .await .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; @@ -114,42 +111,157 @@ where //TODO -> set maximum size of path and break into multiple statements } -pub async fn update_asset( +pub async fn upsert_asset_with_leaf_info( txn: &T, id: Vec, - seq: Option, - model: asset::ActiveModel, + leaf: Option>, + seq: Option, + was_decompressed: bool, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { - let update_one = if let Some(seq) = seq { - asset::Entity::update(model).filter( - Condition::all() - .add(asset::Column::Id.eq(id.clone())) - .add(asset::Column::Seq.lte(seq)), - ) - } else { - asset::Entity::update(model).filter(asset::Column::Id.eq(id.clone())) + let model = asset::ActiveModel { + id: Set(id), + leaf: Set(leaf), + leaf_seq: Set(seq), + ..Default::default() }; - match update_one.exec(txn).await { - Ok(_) => Ok(()), - Err(err) => match err { - DbErr::RecordNotFound(ref s) => { - if s.contains("None of the database rows are affected") { - warn!( - "Update failed. No asset found for id {}.", - bs58::encode(id).into_string() - ); - Ok(()) - } else { - Err(IngesterError::from(err)) - } - } - _ => Err(IngesterError::from(err)), - }, + let mut query = asset::Entity::insert(model) + .on_conflict( + OnConflict::column(asset::Column::Id) + .update_columns([asset::Column::Leaf, asset::Column::LeafSeq]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + // If we are indexing decompression we will update the leaf regardless of if we have previously + // indexed decompression and regardless of seq. + if !was_decompressed { + query.sql = format!( + "{} WHERE (NOT asset.was_decompressed) AND (excluded.leaf_seq > asset.leaf_seq OR asset.leaf_seq IS NULL)", + query.sql + ); } + + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +pub async fn upsert_asset_with_owner_and_delegate_info( + txn: &T, + id: Vec, + owner: Vec, + delegate: Option>, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset::ActiveModel { + id: Set(id), + owner: Set(Some(owner)), + delegate: Set(delegate), + owner_delegate_seq: Set(Some(seq)), // gummyroll seq + ..Default::default() + }; + + let mut query = asset::Entity::insert(model) + .on_conflict( + OnConflict::column(asset::Column::Id) + .update_columns([ + asset::Column::Owner, + asset::Column::Delegate, + asset::Column::OwnerDelegateSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.owner_delegate_seq > asset.owner_delegate_seq OR asset.owner_delegate_seq IS NULL", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +pub async fn upsert_asset_with_compression_info( + txn: &T, + id: Vec, + compressed: bool, + compressible: bool, + supply: i64, + supply_mint: Option>, + was_decompressed: bool, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset::ActiveModel { + id: Set(id), + compressed: Set(compressed), + compressible: Set(compressible), + supply: Set(supply), + supply_mint: Set(supply_mint), + was_decompressed: Set(was_decompressed), + ..Default::default() + }; + + let mut query = asset::Entity::insert(model) + .on_conflict( + OnConflict::columns([asset::Column::Id]) + .update_columns([ + asset::Column::Compressed, + asset::Column::Compressible, + asset::Column::Supply, + asset::Column::SupplyMint, + asset::Column::WasDecompressed, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!("{} WHERE NOT asset.was_decompressed", query.sql); + txn.execute(query).await?; + + Ok(()) +} + +pub async fn upsert_asset_with_seq(txn: &T, id: Vec, seq: i64) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset::ActiveModel { + id: Set(id), + seq: Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset::Entity::insert(model) + .on_conflict( + OnConflict::column(asset::Column::Id) + .update_columns([asset::Column::Seq]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + query.sql = format!( + "{} WHERE excluded.seq > asset.seq OR asset.seq IS NULL", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + + Ok(()) } pub async fn update_creator( diff --git a/nft_ingester/src/program_transformers/bubblegum/decompress.rs b/nft_ingester/src/program_transformers/bubblegum/decompress.rs index fa81a42be..29d7ff096 100644 --- a/nft_ingester/src/program_transformers/bubblegum/decompress.rs +++ b/nft_ingester/src/program_transformers/bubblegum/decompress.rs @@ -1,43 +1,33 @@ -use crate::error::IngesterError; -use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}; -use digital_asset_types::dao::asset; -use sea_orm::{ - entity::*, query::*, ColumnTrait, ConnectionTrait, DbBackend, EntityTrait, +use crate::{ + error::IngesterError, + program_transformers::bubblegum::{ + upsert_asset_with_compression_info, upsert_asset_with_leaf_info, + }, }; +use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}; +use sea_orm::{ConnectionTrait, TransactionTrait}; pub async fn decompress<'c, T>( - parsing_result: &BubblegumInstruction, + _parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { - let id_bytes = bundle.keys.get(3).unwrap().0.as_slice().to_vec(); - - let model = asset::ActiveModel { - id: Unchanged(id_bytes.clone()), - leaf: Set(None), - compressed: Set(false), - compressible: Set(false), - supply: Set(1), - supply_mint: Set(Some(id_bytes.clone())), - ..Default::default() - }; + let id_bytes = bundle.keys.get(3).unwrap().0.as_slice(); - // After the decompress instruction runs, the asset is no longer managed - // by Bubblegum and Gummyroll, so there will not be any other instructions - // after this one. - // - // Do not run this command if the asset is already marked as - // decompressed. - let query = asset::Entity::update(model) - .filter( - Condition::all() - .add(asset::Column::Id.eq(id_bytes.clone())) - .add(asset::Column::Compressed.eq(true)), - ) - .build(DbBackend::Postgres); + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info(txn, id_bytes.to_vec(), None, None, true).await?; - txn.execute(query).await.map(|_| ()).map_err(Into::into) + upsert_asset_with_compression_info( + txn, + id_bytes.to_vec(), + false, + false, + 1, + Some(id_bytes.to_vec()), + true, + ) + .await } diff --git a/nft_ingester/src/program_transformers/bubblegum/delegate.rs b/nft_ingester/src/program_transformers/bubblegum/delegate.rs index e535fcf9e..e3d75b219 100644 --- a/nft_ingester/src/program_transformers/bubblegum/delegate.rs +++ b/nft_ingester/src/program_transformers/bubblegum/delegate.rs @@ -1,13 +1,15 @@ use crate::{ error::IngesterError, + program_transformers::bubblegum::{ + save_changelog_event, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, + }, }; -use super::{update_asset, save_changelog_event}; use blockbuster::{ instruction::InstructionBundle, programs::bubblegum::{BubblegumInstruction, LeafSchema}, }; -use digital_asset_types::dao::asset; -use sea_orm::{entity::*, ConnectionTrait, TransactionTrait}; +use sea_orm::{ConnectionTrait, TransactionTrait}; pub async fn delegate<'c, T>( parsing_result: &BubblegumInstruction, @@ -19,29 +21,43 @@ where { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { let seq = save_changelog_event(cl, bundle.slot, txn).await?; + #[allow(unreachable_patterns)] return match le.schema { LeafSchema::V1 { id, - delegate, owner, + delegate, .. } => { - let id_bytes = id.to_bytes().to_vec(); + let id_bytes = id.to_bytes(); + let owner_bytes = owner.to_bytes().to_vec(); let delegate = if owner == delegate { None } else { Some(delegate.to_bytes().to_vec()) }; - let owner_bytes = owner.to_bytes().to_vec(); - let asset_to_update = asset::ActiveModel { - id: Unchanged(id_bytes.clone()), - leaf: Set(Some(le.leaf_hash.to_vec())), - delegate: Set(delegate), - owner: Set(Some(owner_bytes)), - seq: Set(seq as i64), // gummyroll seq - ..Default::default() - }; - update_asset(txn, id_bytes, Some(seq), asset_to_update).await + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + txn, + id_bytes.to_vec(), + Some(le.leaf_hash.to_vec()), + Some(seq as i64), + false, + ) + .await?; + + // Partial update of asset table with just leaf owner and delegate. + upsert_asset_with_owner_and_delegate_info( + txn, + id_bytes.to_vec(), + owner_bytes, + delegate, + seq as i64, + ) + .await?; + + upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await } _ => Err(IngesterError::NotImplemented), }; diff --git a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs index e662292c2..7cf2facbc 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs @@ -1,15 +1,19 @@ use super::save_changelog_event; use crate::{ error::IngesterError, + program_transformers::bubblegum::{ + upsert_asset_with_compression_info, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, + }, tasks::{DownloadMetadata, IntoTaskData, TaskData}, }; -use blockbuster::token_metadata::{ - pda::find_master_edition_account, - state::{TokenStandard, UseMethod, Uses}, -}; use blockbuster::{ instruction::InstructionBundle, programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, + token_metadata::{ + pda::find_master_edition_account, + state::{TokenStandard, UseMethod, Uses}, + }, }; use chrono::Utc; use digital_asset_types::{ @@ -48,6 +52,7 @@ where ) { let seq = save_changelog_event(cl, bundle.slot, txn).await?; let metadata = args; + #[allow(unreachable_patterns)] return match le.schema { LeafSchema::V1 { id, @@ -92,7 +97,6 @@ where metadata: Set(JsonValue::String("processing".to_string())), metadata_mutability: Set(Mutability::Mutable), slot_updated: Set(slot_i), - ..Default::default() }; let mut query = asset_data::Entity::insert(data) @@ -130,42 +134,85 @@ where .unwrap_or("".to_string()) .trim() .to_string(); - let model = asset::ActiveModel { + + // Set initial mint info. + let asset_model = asset::ActiveModel { id: Set(id_bytes.to_vec()), - owner: Set(Some(owner.to_bytes().to_vec())), owner_type: Set(OwnerType::Single), - delegate: Set(delegate), frozen: Set(false), - supply: Set(1), - supply_mint: Set(None), - compressed: Set(true), tree_id: Set(Some(bundle.keys.get(3).unwrap().0.to_vec())), - specification_version: Set(SpecificationVersions::V1), - specification_asset_class: Set(SpecificationAssetClass::Nft), - nonce: Set(nonce as i64), - leaf: Set(Some(le.leaf_hash.to_vec())), + specification_version: Set(Some(SpecificationVersions::V1)), + specification_asset_class: Set(Some(SpecificationAssetClass::Nft)), + nonce: Set(Some(nonce as i64)), royalty_target_type: Set(RoyaltyTargetType::Creators), royalty_target: Set(None), royalty_amount: Set(metadata.seller_fee_basis_points as i32), //basis points asset_data: Set(Some(id_bytes.to_vec())), - seq: Set(seq as i64), // gummyroll seq - slot_updated: Set(slot_i), + slot_updated: Set(Some(slot_i)), data_hash: Set(Some(data_hash)), creator_hash: Set(Some(creator_hash)), ..Default::default() }; - // Do not attempt to modify any existing values: - // `ON CONFLICT ('id') DO NOTHING`. - let query = asset::Entity::insert(model) + // Upsert asset table base info. + let query = asset::Entity::insert(asset_model) .on_conflict( OnConflict::columns([asset::Column::Id]) - .do_nothing() + .update_columns([ + asset::Column::OwnerType, + asset::Column::Frozen, + asset::Column::TreeId, + asset::Column::SpecificationVersion, + asset::Column::SpecificationAssetClass, + asset::Column::Nonce, + asset::Column::RoyaltyTargetType, + asset::Column::RoyaltyTarget, + asset::Column::RoyaltyAmount, + asset::Column::AssetData, + //TODO maybe handle slot updated differently. + asset::Column::SlotUpdated, + asset::Column::DataHash, + asset::Column::CreatorHash, + ]) .to_owned(), ) .build(DbBackend::Postgres); txn.execute(query).await?; + // Partial update of asset table with just compression info elements. + upsert_asset_with_compression_info( + txn, + id_bytes.to_vec(), + true, + false, + 1, + None, + false, + ) + .await?; + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + txn, + id_bytes.to_vec(), + Some(le.leaf_hash.to_vec()), + Some(seq as i64), + false, + ) + .await?; + + // Partial update of asset table with just leaf owner and delegate. + upsert_asset_with_owner_and_delegate_info( + txn, + id_bytes.to_vec(), + owner.to_bytes().to_vec(), + delegate, + seq as i64, + ) + .await?; + + upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + let attachment = asset_v1_account_attachments::ActiveModel { id: Set(edition_attachment_address.to_bytes().to_vec()), slot_updated: Set(slot_i), diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs index f8444d26a..66b9d90d7 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mod.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mod.rs @@ -126,3 +126,11 @@ where } Ok(()) } + +// PDA lookup requires an 8-byte array. +fn u32_to_u8_array(value: u32) -> [u8; 8] { + let bytes: [u8; 4] = value.to_le_bytes(); + let mut result: [u8; 8] = [0; 8]; + result[..4].copy_from_slice(&bytes); + result +} diff --git a/nft_ingester/src/program_transformers/bubblegum/redeem.rs b/nft_ingester/src/program_transformers/bubblegum/redeem.rs index 7a440586e..4ee4126c8 100644 --- a/nft_ingester/src/program_transformers/bubblegum/redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/redeem.rs @@ -1,15 +1,13 @@ use crate::{ - program_transformers::bubblegum::{update_asset}, error::IngesterError, + program_transformers::bubblegum::{ + save_changelog_event, u32_to_u8_array, upsert_asset_with_leaf_info, upsert_asset_with_seq, + }, }; - -use super::save_changelog_event; -use blockbuster::{ - instruction::InstructionBundle, - programs::bubblegum::{BubblegumInstruction, LeafSchema}, -}; -use digital_asset_types::dao::asset; -use sea_orm::{entity::*, ConnectionTrait, TransactionTrait}; +use anchor_lang::prelude::Pubkey; +use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}; +use log::debug; +use sea_orm::{ConnectionTrait, TransactionTrait}; pub async fn redeem<'c, T>( parsing_result: &BubblegumInstruction, @@ -19,34 +17,33 @@ pub async fn redeem<'c, T>( where T: ConnectionTrait + TransactionTrait, { - if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { + if let Some(cl) = &parsing_result.tree_update { let seq = save_changelog_event(cl, bundle.slot, txn).await?; - return match le.schema { - LeafSchema::V1 { - id, - delegate, - owner, - .. - } => { - let id_bytes = id.to_bytes().to_vec(); - let delegate = if owner == delegate { - None - } else { - Some(delegate.to_bytes().to_vec()) - }; - let owner_bytes = owner.to_bytes().to_vec(); - let asset_to_update = asset::ActiveModel { - id: Unchanged(id_bytes.clone()), - leaf: Set(Some(vec![0; 32])), - delegate: Set(delegate), - owner: Set(Some(owner_bytes)), - seq: Set(seq as i64), - ..Default::default() - }; - update_asset(txn, id_bytes, Some(seq), asset_to_update).await - } - _ => Err(IngesterError::NotImplemented), - }; + let leaf_index = cl.index; + let (asset_id, _) = Pubkey::find_program_address( + &[ + "asset".as_bytes(), + cl.id.as_ref(), + u32_to_u8_array(leaf_index).as_ref(), + ], + &mpl_bubblegum::ID, + ); + debug!("Indexing redeem for asset id: {:?}", asset_id); + let id_bytes = asset_id.to_bytes(); + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + txn, + id_bytes.to_vec(), + Some(vec![0; 32]), + Some(seq as i64), + false, + ) + .await?; + + upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + + return Ok(()); } Err(IngesterError::ParsingError( "Ix not parsed correctly".to_string(), diff --git a/nft_ingester/src/program_transformers/bubblegum/transfer.rs b/nft_ingester/src/program_transformers/bubblegum/transfer.rs index b140b2082..8dce72c55 100644 --- a/nft_ingester/src/program_transformers/bubblegum/transfer.rs +++ b/nft_ingester/src/program_transformers/bubblegum/transfer.rs @@ -1,15 +1,16 @@ +use super::save_changelog_event; use crate::{ - program_transformers::bubblegum::{update_asset}, error::IngesterError, + program_transformers::bubblegum::{ + upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, + upsert_asset_with_seq, + }, }; - -use super::save_changelog_event; use blockbuster::{ instruction::InstructionBundle, programs::bubblegum::{BubblegumInstruction, LeafSchema}, }; -use digital_asset_types::dao::asset; -use sea_orm::{entity::*, ConnectionTrait, TransactionTrait}; +use sea_orm::{ConnectionTrait, TransactionTrait}; pub async fn transfer<'c, T>( parsing_result: &BubblegumInstruction, @@ -21,29 +22,43 @@ where { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { let seq = save_changelog_event(cl, bundle.slot, txn).await?; + #[allow(unreachable_patterns)] return match le.schema { LeafSchema::V1 { id, - delegate, owner, + delegate, .. } => { - let id_bytes = id.to_bytes().to_vec(); + let id_bytes = id.to_bytes(); + let owner_bytes = owner.to_bytes().to_vec(); let delegate = if owner == delegate { None } else { Some(delegate.to_bytes().to_vec()) }; - let owner_bytes = owner.to_bytes().to_vec(); - let asset_to_update = asset::ActiveModel { - id: Unchanged(id_bytes.clone()), - leaf: Set(Some(le.leaf_hash.to_vec())), - delegate: Set(delegate), - owner: Set(Some(owner_bytes)), - seq: Set(seq as i64), // gummyroll seq - ..Default::default() - }; - update_asset(txn, id_bytes, Some(seq), asset_to_update).await + + // Partial update of asset table with just leaf. + upsert_asset_with_leaf_info( + txn, + id_bytes.to_vec(), + Some(le.leaf_hash.to_vec()), + Some(seq as i64), + false, + ) + .await?; + + // Partial update of asset table with just leaf owner and delegate. + upsert_asset_with_owner_and_delegate_info( + txn, + id_bytes.to_vec(), + owner_bytes, + delegate, + seq as i64, + ) + .await?; + + upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await } _ => Err(IngesterError::NotImplemented), }; diff --git a/nft_ingester/src/program_transformers/token_metadata/master_edition.rs b/nft_ingester/src/program_transformers/token_metadata/master_edition.rs index 78ebbef37..4e30970b6 100644 --- a/nft_ingester/src/program_transformers/token_metadata/master_edition.rs +++ b/nft_ingester/src/program_transformers/token_metadata/master_edition.rs @@ -74,7 +74,7 @@ pub async fn save_master_edition( if let Some((_me, Some(asset))) = master_edition { let mut updatable: asset::ActiveModel = asset.into(); updatable.supply = Set(1); - updatable.specification_asset_class = Set(SpecificationAssetClass::Nft); + updatable.specification_asset_class = Set(Some(SpecificationAssetClass::Nft)); updatable.update(txn).await?; } diff --git a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs index d8d59952a..4ec329dd7 100644 --- a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs +++ b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs @@ -45,7 +45,7 @@ pub async fn burn_v1_asset( let slot_i = slot as i64; let model = asset::ActiveModel { id: Set(id.to_vec()), - slot_updated: Set(slot_i), + slot_updated: Set(Some(slot_i)), burnt: Set(true), ..Default::default() }; @@ -226,11 +226,11 @@ pub async fn save_v1_asset( frozen: Set(false), supply, supply_mint, - specification_version: Set(SpecificationVersions::V1), - specification_asset_class: Set(class), + specification_version: Set(Some(SpecificationVersions::V1)), + specification_asset_class: Set(Some(class)), tree_id: Set(None), - nonce: Set(0), - seq: Set(0), + nonce: Set(Some(0)), + seq: Set(Some(0)), leaf: Set(None), compressed: Set(false), compressible: Set(false), @@ -238,7 +238,7 @@ pub async fn save_v1_asset( royalty_target: Set(None), royalty_amount: Set(data.seller_fee_basis_points as i32), //basis points asset_data: Set(Some(id.to_vec())), - slot_updated: Set(slot_i), + slot_updated: Set(Some(slot_i)), burnt: Set(false), ..Default::default() }; diff --git a/prepare-local-docker-env.sh b/prepare-local-docker-env.sh index 548ecf5eb..4d1b6ab25 100755 --- a/prepare-local-docker-env.sh +++ b/prepare-local-docker-env.sh @@ -28,10 +28,6 @@ pushd solana_program_library/account-compression/programs/noop mv ./here/spl_noop.so $CWD/programs/noopb9bkMVfRPU8AsbpTUg8AQkHtKwMYZiFUjNRtMmV.so popd -pushd solana_program_library - rm -rf Cargo.toml -popd - pushd solana_program_library/associated-token-account/program cargo build-bpf --bpf-out-dir ./here mv ./here/spl_associated_token_account.so $CWD/programs/ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL.so @@ -49,4 +45,3 @@ popd rm -rf solana_program_library rm -rf metaplex_program_library - From 3ca1303d31eda8999764d90c4351d1b72b1a547d Mon Sep 17 00:00:00 2001 From: Michael Danenberg <56533526+danenbm@users.noreply.github.com> Date: Wed, 26 Jul 2023 18:16:55 -0700 Subject: [PATCH 2/2] Change creator and collection verification to use upserts to support out of order (#87) * Fix docker preparation script to build SPL * Update owner and delegate in asset table when collection or creator verification occurs * Modify program transformers to upsert in asset table * This allows out-of-order Bubblegum transactions to create and update the asset table. * Upsert leaf schema, owner, delegate, and seq separately since those are updated by all instructions and gated by sequence number to ensure freshest value. * Mint, burn, and decompress happen without regard to sequence number because they operate on unique fields. * Mint and burn have been updated but Decompress still needs to be fixed to handle out of order transactions. * Also remove unused 'filling' variable. * Update mint and decompress to be able to upsert asset info out of order * Add second sequence number for compression status fields * Reduce logging in docker * Comment out compressed_seq before regenerating Sea ORM objects * Add migration for asset specification * Update README * Rename PNFT and regenerate Sea ORM types * Apply usage of compressed_seq after regenerating Sea ORM types * Add owner delegate sequence number for owner and delegate fields. Also remove not null constraints for asset fields without defaults. * Regenerating database types * Update handling for non null constrained asset table * Update tests to use new Sea ORM types * Use owner_and_delegate_seq to separate upserts Also update redeem and decompress to not use leaf schema events. * Adding was_decompressed flag to replace compressed_seq compressed_seq won't work because decompression doesn't create a cl_event. * Regenerating Sea ORM types * Update code to use was_decompressed flag * Fix new boolean SQL conditions * Update comment * Remove column updates in asset table during mint for items not in model * Clippy fixes in ingester main * Cleanup debug comment * Allow for sequence number to be NULL (needed after decompress now) * Add leaf specific sequence number to protect that field in asset table * Revert "Allow for sequence number to be NULL (needed after decompress now)" This reverts commit 2713a18ad2ddfac1944a9585774c45706933bf89. * Update nft_ingester/src/program_transformers/bubblegum/redeem.rs Co-authored-by: Nicolas Pennie * Change creator verification to use upserts to support out of order * Remove null constraints on asset_creators table * Add null clause to upsert during mint * Rename creator vecs and add comments * Removing comment * Fix typo in migration down function * Fix collection verification and change to use upserts to support out of order processing (#90) * Adding verified flag to asset_grouping table * Regenerate Sea ORM types * Remove null constraints on asset_grouping table * Regenerate Sea ORM types * Update digital asset types and ingester based on new Sea ORM objects * Setting new verified flag in asset_grouping table to be non null with default Also regenerating Sea ORM types * Separate out collection insert in mintV1 into separate upserts * Fix error message * Separate update collection base info from collection verified * Add group info seq to asset_grouping table * Regenerate Sea ORM types * Add group_info_seq checks to collection base info upsert * Add check for verified = true in grouping for Read API * Fix conditions for asset grouping updates * Require grouping to verified to be returned from API in all cases --------- Co-authored-by: Nicolas Pennie --- das_api/src/api/api_impl.rs | 4 +- das_api/src/api/mod.rs | 2 +- das_api/src/main.rs | 4 +- .../src/dao/generated/asset.rs | 12 +- .../src/dao/generated/asset_creators.rs | 8 +- .../src/dao/generated/asset_grouping.rs | 18 ++- .../src/dao/generated/sea_orm_active_enums.rs | 52 +++---- digital_asset_types/src/dao/scopes/asset.rs | 10 +- digital_asset_types/src/dapi/common/asset.rs | 60 ++++---- digital_asset_types/tests/common.rs | 14 +- migration/src/lib.rs | 8 ++ ..._remove_asset_creators_null_constraints.rs | 44 ++++++ ...0720_120101_add_asset_grouping_verified.rs | 37 +++++ ..._remove_asset_grouping_null_constraints.rs | 46 ++++++ .../m20230724_120101_add_group_info_seq.rs | 36 +++++ nft_ingester/src/account_updates.rs | 7 +- .../bubblegum/collection_verification.rs | 54 +++---- .../bubblegum/creator_verification.rs | 33 ++--- .../src/program_transformers/bubblegum/db.rs | 134 +++++++++++++++--- .../program_transformers/bubblegum/mint_v1.rs | 100 ++++++++----- .../token_metadata/v1_asset.rs | 12 +- 21 files changed, 484 insertions(+), 211 deletions(-) create mode 100644 migration/src/m20230712_120101_remove_asset_creators_null_constraints.rs create mode 100644 migration/src/m20230720_120101_add_asset_grouping_verified.rs create mode 100644 migration/src/m20230720_130101_remove_asset_grouping_null_constraints.rs create mode 100644 migration/src/m20230724_120101_add_group_info_seq.rs diff --git a/das_api/src/api/api_impl.rs b/das_api/src/api/api_impl.rs index 8c981aaaa..ffdfa2d55 100644 --- a/das_api/src/api/api_impl.rs +++ b/das_api/src/api/api_impl.rs @@ -1,5 +1,3 @@ -use std::vec; - use digital_asset_types::{ dao::{ scopes::asset::get_grouping, @@ -356,7 +354,7 @@ impl ApiContract for DasApi { } = payload; let gs = get_grouping(&self.db_connection, group_key.clone(), group_value.clone()).await?; Ok(GetGroupingResponse { - group_key: group_key, + group_key, group_name: group_value, group_size: gs.size, }) diff --git a/das_api/src/api/mod.rs b/das_api/src/api/mod.rs index 34ba2d37e..0c5546e15 100644 --- a/das_api/src/api/mod.rs +++ b/das_api/src/api/mod.rs @@ -1,4 +1,4 @@ -use crate::{DasApiError, RpcModule}; +use crate::DasApiError; use async_trait::async_trait; use digital_asset_types::rpc::filter::SearchConditionType; use digital_asset_types::rpc::response::AssetList; diff --git a/das_api/src/main.rs b/das_api/src/main.rs index 5450072e9..368eddecb 100644 --- a/das_api/src/main.rs +++ b/das_api/src/main.rs @@ -4,7 +4,7 @@ mod config; mod error; mod validation; -use std::time::{Duration, Instant}; +use std::time::Instant; use { crate::api::DasApi, crate::builder::RpcApiBuilder, @@ -17,7 +17,7 @@ use { std::net::UdpSocket, }; -use hyper::{http, Method}; +use hyper::Method; use log::{debug, info}; use tower_http::cors::{Any, CorsLayer}; diff --git a/digital_asset_types/src/dao/generated/asset.rs b/digital_asset_types/src/dao/generated/asset.rs index 642ccf345..0ced69299 100644 --- a/digital_asset_types/src/dao/generated/asset.rs +++ b/digital_asset_types/src/dao/generated/asset.rs @@ -97,8 +97,8 @@ pub enum Relation { AssetData, AssetV1AccountAttachments, AssetCreators, - AssetGrouping, AssetAuthority, + AssetGrouping, } impl ColumnTrait for Column { @@ -148,8 +148,8 @@ impl RelationTrait for Relation { Entity::has_many(super::asset_v1_account_attachments::Entity).into() } Self::AssetCreators => Entity::has_many(super::asset_creators::Entity).into(), - Self::AssetGrouping => Entity::has_many(super::asset_grouping::Entity).into(), Self::AssetAuthority => Entity::has_many(super::asset_authority::Entity).into(), + Self::AssetGrouping => Entity::has_many(super::asset_grouping::Entity).into(), } } } @@ -172,15 +172,15 @@ impl Related for Entity { } } -impl Related for Entity { +impl Related for Entity { fn to() -> RelationDef { - Relation::AssetGrouping.def() + Relation::AssetAuthority.def() } } -impl Related for Entity { +impl Related for Entity { fn to() -> RelationDef { - Relation::AssetAuthority.def() + Relation::AssetGrouping.def() } } diff --git a/digital_asset_types/src/dao/generated/asset_creators.rs b/digital_asset_types/src/dao/generated/asset_creators.rs index 68de902d5..21f34dcf7 100644 --- a/digital_asset_types/src/dao/generated/asset_creators.rs +++ b/digital_asset_types/src/dao/generated/asset_creators.rs @@ -19,8 +19,8 @@ pub struct Model { pub creator: Vec, pub share: i32, pub verified: bool, - pub seq: i64, - pub slot_updated: i64, + pub seq: Option, + pub slot_updated: Option, pub position: i16, } @@ -62,8 +62,8 @@ impl ColumnTrait for Column { Self::Creator => ColumnType::Binary.def(), Self::Share => ColumnType::Integer.def(), Self::Verified => ColumnType::Boolean.def(), - Self::Seq => ColumnType::BigInteger.def(), - Self::SlotUpdated => ColumnType::BigInteger.def(), + Self::Seq => ColumnType::BigInteger.def().null(), + Self::SlotUpdated => ColumnType::BigInteger.def().null(), Self::Position => ColumnType::SmallInteger.def(), } } diff --git a/digital_asset_types/src/dao/generated/asset_grouping.rs b/digital_asset_types/src/dao/generated/asset_grouping.rs index d0c547640..5d5c0e749 100644 --- a/digital_asset_types/src/dao/generated/asset_grouping.rs +++ b/digital_asset_types/src/dao/generated/asset_grouping.rs @@ -17,9 +17,11 @@ pub struct Model { pub id: i64, pub asset_id: Vec, pub group_key: String, - pub group_value: String, - pub seq: i64, - pub slot_updated: i64, + pub group_value: Option, + pub seq: Option, + pub slot_updated: Option, + pub verified: bool, + pub group_info_seq: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -30,6 +32,8 @@ pub enum Column { GroupValue, Seq, SlotUpdated, + Verified, + GroupInfoSeq, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -56,9 +60,11 @@ impl ColumnTrait for Column { Self::Id => ColumnType::BigInteger.def(), Self::AssetId => ColumnType::Binary.def(), Self::GroupKey => ColumnType::Text.def(), - Self::GroupValue => ColumnType::Text.def(), - Self::Seq => ColumnType::BigInteger.def(), - Self::SlotUpdated => ColumnType::BigInteger.def(), + Self::GroupValue => ColumnType::Text.def().null(), + Self::Seq => ColumnType::BigInteger.def().null(), + Self::SlotUpdated => ColumnType::BigInteger.def().null(), + Self::Verified => ColumnType::Boolean.def(), + Self::GroupInfoSeq => ColumnType::BigInteger.def().null(), } } } diff --git a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs index 5fb406acc..2be0283e7 100644 --- a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs +++ b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs @@ -14,16 +14,6 @@ pub enum Mutability { Unknown, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "chain_mutability")] -pub enum ChainMutability { - #[sea_orm(string_value = "immutable")] - Immutable, - #[sea_orm(string_value = "mutable")] - Mutable, - #[sea_orm(string_value = "unknown")] - Unknown, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm( rs_type = "String", db_type = "Enum", @@ -54,6 +44,22 @@ pub enum TaskStatus { Success, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm( + rs_type = "String", + db_type = "Enum", + enum_name = "royalty_target_type" +)] +pub enum RoyaltyTargetType { + #[sea_orm(string_value = "creators")] + Creators, + #[sea_orm(string_value = "fanout")] + Fanout, + #[sea_orm(string_value = "single")] + Single, + #[sea_orm(string_value = "unknown")] + Unknown, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm( rs_type = "String", db_type = "Enum", @@ -82,6 +88,16 @@ pub enum SpecificationAssetClass { Unknown, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "chain_mutability")] +pub enum ChainMutability { + #[sea_orm(string_value = "immutable")] + Immutable, + #[sea_orm(string_value = "mutable")] + Mutable, + #[sea_orm(string_value = "unknown")] + Unknown, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm( rs_type = "String", db_type = "Enum", @@ -98,22 +114,6 @@ pub enum SpecificationVersions { V2, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm( - rs_type = "String", - db_type = "Enum", - enum_name = "royalty_target_type" -)] -pub enum RoyaltyTargetType { - #[sea_orm(string_value = "creators")] - Creators, - #[sea_orm(string_value = "fanout")] - Fanout, - #[sea_orm(string_value = "single")] - Single, - #[sea_orm(string_value = "unknown")] - Unknown, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "owner_type")] pub enum OwnerType { #[sea_orm(string_value = "single")] diff --git a/digital_asset_types/src/dao/scopes/asset.rs b/digital_asset_types/src/dao/scopes/asset.rs index 43a499fae..56ddcd978 100644 --- a/digital_asset_types/src/dao/scopes/asset.rs +++ b/digital_asset_types/src/dao/scopes/asset.rs @@ -65,7 +65,8 @@ pub async fn get_grouping( .filter( Condition::all() .add(asset_grouping::Column::GroupKey.eq(group_key)) - .add(asset_grouping::Column::GroupValue.eq(group_value)), + .add(asset_grouping::Column::GroupValue.eq(group_value)) + .add(asset_grouping::Column::Verified.eq(true)), ) .count(conn) .await?; @@ -83,7 +84,8 @@ pub async fn get_by_grouping( ) -> Result, DbErr> { let condition = asset_grouping::Column::GroupKey .eq(group_key) - .and(asset_grouping::Column::GroupValue.eq(group_value)); + .and(asset_grouping::Column::GroupValue.eq(group_value)) + .and(asset_grouping::Column::Verified.eq(true)); get_by_related_condition( conn, Condition::all() @@ -192,7 +194,7 @@ pub async fn get_related_for_assets( { let id = asset.id.clone(); let fa = FullAsset { - asset: asset, + asset, data: ad.clone(), authorities: vec![], creators: vec![], @@ -228,6 +230,7 @@ pub async fn get_related_for_assets( let grouping = asset_grouping::Entity::find() .filter(asset_grouping::Column::AssetId.is_in(ids.clone())) + .filter(asset_grouping::Column::Verified.eq(true)) .order_by_asc(asset_grouping::Column::AssetId) .all(conn) .await?; @@ -288,6 +291,7 @@ pub async fn get_by_id( .await?; let grouping: Vec = asset_grouping::Entity::find() .filter(asset_grouping::Column::AssetId.eq(asset.id.clone())) + .filter(asset_grouping::Column::Verified.eq(true)) .all(conn) .await?; Ok(FullAsset { diff --git a/digital_asset_types/src/dapi/common/asset.rs b/digital_asset_types/src/dapi/common/asset.rs index 917c2f105..43f01856b 100644 --- a/digital_asset_types/src/dapi/common/asset.rs +++ b/digital_asset_types/src/dapi/common/asset.rs @@ -2,15 +2,14 @@ use crate::dao::sea_orm_active_enums::SpecificationVersions; use crate::dao::FullAsset; use crate::dao::Pagination; use crate::dao::{asset, asset_authority, asset_creators, asset_data, asset_grouping}; - use crate::rpc::filter::{AssetSortBy, AssetSortDirection, AssetSorting}; use crate::rpc::response::{AssetError, AssetList}; use crate::rpc::{ Asset as RpcAsset, Authority, Compression, Content, Creator, File, Group, Interface, MetadataMap, Ownership, Royalty, Scope, Supply, Uses, }; - use jsonpath_lib::JsonPathError; +use log::warn; use mime_guess::Mime; use sea_orm::DbErr; use serde_json::Value; @@ -18,8 +17,6 @@ use std::collections::HashMap; use std::path::Path; use url::Url; -use log::{debug, info, warn}; - pub fn to_uri(uri: String) -> Option { Url::parse(&*uri).ok() } @@ -30,7 +27,9 @@ pub fn get_mime(url: Url) -> Option { pub fn get_mime_type_from_uri(uri: String) -> String { let default_mime_type = "image/png".to_string(); - to_uri(uri).and_then(get_mime).map_or(default_mime_type, |m| m.to_string()) + to_uri(uri) + .and_then(get_mime) + .map_or(default_mime_type, |m| m.to_string()) } pub fn file_from_str(str: String) -> File { @@ -173,28 +172,25 @@ pub fn v1_content_from_json(asset_data: &asset_data::Model) -> Result { if let Some(str_uri) = u.as_str() { - let file = - if let Some(str_mime) = m.as_str() { - File { - uri: Some(str_uri.to_string()), - mime: Some(str_mime.to_string()), - quality: None, - contexts: None, - } - } else { - warn!("Mime is not string: {:?}", m); - file_from_str(str_uri.to_string()) - }; - actual_files.insert( - str_uri.to_string().clone(), - file, - ); + let file = if let Some(str_mime) = m.as_str() { + File { + uri: Some(str_uri.to_string()), + mime: Some(str_mime.to_string()), + quality: None, + contexts: None, + } + } else { + warn!("Mime is not string: {:?}", m); + file_from_str(str_uri.to_string()) + }; + actual_files.insert(str_uri.to_string().clone(), file); } else { warn!("URI is not string: {:?}", u); } } (Some(u), None) => { - let str_uri = serde_json::to_string(u).unwrap_or_else(|_|String::new()); + let str_uri = + serde_json::to_string(u).unwrap_or_else(|_| String::new()); actual_files.insert(str_uri.clone(), file_from_str(str_uri)); } _ => {} @@ -250,14 +246,18 @@ pub fn to_creators(creators: Vec) -> Vec { .collect() } -pub fn to_grouping(groups: Vec) -> Vec { - groups - .iter() - .map(|a| Group { - group_key: a.group_key.clone(), - group_value: a.group_value.clone(), +pub fn to_grouping(groups: Vec) -> Result, DbErr> { + fn find_group(model: &asset_grouping::Model) -> Result { + Ok(Group { + group_key: model.group_key.clone(), + group_value: model + .group_value + .clone() + .ok_or(DbErr::Custom("Group value not found".to_string()))?, }) - .collect() + } + + groups.iter().map(find_group).collect() } pub fn get_interface(asset: &asset::Model) -> Result { @@ -286,7 +286,7 @@ pub fn asset_to_rpc(asset: FullAsset) -> Result { } = asset; let rpc_authorities = to_authority(authorities); let rpc_creators = to_creators(creators); - let rpc_groups = to_grouping(groups); + let rpc_groups = to_grouping(groups)?; let interface = get_interface(&asset)?; let content = get_content(&asset, &data)?; let mut chain_data_selector_fn = jsonpath_lib::selector(&data.chain_data); diff --git a/digital_asset_types/tests/common.rs b/digital_asset_types/tests/common.rs index 9bb28a006..653b61e7b 100644 --- a/digital_asset_types/tests/common.rs +++ b/digital_asset_types/tests/common.rs @@ -179,8 +179,8 @@ pub fn create_asset_creator( creator, share, verified, - seq: 0, - slot_updated: 0, + seq: Some(0), + slot_updated: Some(0), position: 0, }, ) @@ -218,16 +218,18 @@ pub fn create_asset_grouping( asset_grouping::ActiveModel { asset_id: Set(asset_id.clone()), group_key: Set(String::from("collection")), - group_value: Set(bs58::encode(collection).into_string()), + group_value: Set(Some(bs58::encode(collection).into_string())), ..Default::default() }, asset_grouping::Model { asset_id, - group_value: bs58::encode(collection).into_string(), - seq: 0, + group_value: Some(bs58::encode(collection).into_string()), + seq: Some(0), id: row_num, group_key: "collection".to_string(), - slot_updated: 0, + slot_updated: Some(0), + verified: false, + group_info_seq: Some(0), }, ) } diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 6edefc669..11e8b5d0f 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -21,6 +21,10 @@ mod m20230601_120101_add_pnft_enum_val; mod m20230615_120101_remove_asset_null_constraints; mod m20230620_120101_add_was_decompressed; mod m20230623_120101_add_leaf_sequence_number; +mod m20230712_120101_remove_asset_creators_null_constraints; +mod m20230720_120101_add_asset_grouping_verified; +mod m20230720_130101_remove_asset_grouping_null_constraints; +mod m20230724_120101_add_group_info_seq; pub struct Migrator; @@ -49,6 +53,10 @@ impl MigratorTrait for Migrator { Box::new(m20230615_120101_remove_asset_null_constraints::Migration), Box::new(m20230620_120101_add_was_decompressed::Migration), Box::new(m20230623_120101_add_leaf_sequence_number::Migration), + Box::new(m20230712_120101_remove_asset_creators_null_constraints::Migration), + Box::new(m20230720_120101_add_asset_grouping_verified::Migration), + Box::new(m20230720_130101_remove_asset_grouping_null_constraints::Migration), + Box::new(m20230724_120101_add_group_info_seq::Migration), ] } } diff --git a/migration/src/m20230712_120101_remove_asset_creators_null_constraints.rs b/migration/src/m20230712_120101_remove_asset_creators_null_constraints.rs new file mode 100644 index 000000000..73251b0a4 --- /dev/null +++ b/migration/src/m20230712_120101_remove_asset_creators_null_constraints.rs @@ -0,0 +1,44 @@ +use sea_orm_migration::{ + prelude::*, + sea_orm::{ConnectionTrait, DatabaseBackend, Statement}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + ALTER TABLE asset_creators + ALTER COLUMN seq DROP NOT NULL, + ALTER COLUMN slot_updated DROP NOT NULL; + " + .to_string(), + )) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + ALTER TABLE asset_creators + ALTER COLUMN seq SET NOT NULL, + ALTER COLUMN slot_updated SET NOT NULL; + " + .to_string(), + )) + .await?; + + Ok(()) + } +} diff --git a/migration/src/m20230720_120101_add_asset_grouping_verified.rs b/migration/src/m20230720_120101_add_asset_grouping_verified.rs new file mode 100644 index 000000000..8f1cfd4cc --- /dev/null +++ b/migration/src/m20230720_120101_add_asset_grouping_verified.rs @@ -0,0 +1,37 @@ +use digital_asset_types::dao::asset_grouping; +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset_grouping::Entity) + .add_column( + ColumnDef::new(Alias::new("verified")) + .boolean() + .not_null() + .default(false), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset_grouping::Entity) + .drop_column(Alias::new("verified")) + .to_owned(), + ) + .await + } +} diff --git a/migration/src/m20230720_130101_remove_asset_grouping_null_constraints.rs b/migration/src/m20230720_130101_remove_asset_grouping_null_constraints.rs new file mode 100644 index 000000000..debefecf2 --- /dev/null +++ b/migration/src/m20230720_130101_remove_asset_grouping_null_constraints.rs @@ -0,0 +1,46 @@ +use sea_orm_migration::{ + prelude::*, + sea_orm::{ConnectionTrait, DatabaseBackend, Statement}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + ALTER TABLE asset_grouping + ALTER COLUMN group_value DROP NOT NULL, + ALTER COLUMN seq DROP NOT NULL, + ALTER COLUMN slot_updated DROP NOT NULL; + " + .to_string(), + )) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + ALTER TABLE asset_grouping + ALTER COLUMN group_value SET NOT NULL, + ALTER COLUMN seq SET NOT NULL, + ALTER COLUMN slot_updated SET NOT NULL; + " + .to_string(), + )) + .await?; + + Ok(()) + } +} diff --git a/migration/src/m20230724_120101_add_group_info_seq.rs b/migration/src/m20230724_120101_add_group_info_seq.rs new file mode 100644 index 000000000..89f2414a5 --- /dev/null +++ b/migration/src/m20230724_120101_add_group_info_seq.rs @@ -0,0 +1,36 @@ +use digital_asset_types::dao::asset_grouping; +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset_grouping::Entity) + .add_column(ColumnDef::new(Alias::new("group_info_seq")).big_integer()) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset_grouping::Entity) + .drop_column(Alias::new("group_info_seq")) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/nft_ingester/src/account_updates.rs b/nft_ingester/src/account_updates.rs index f25933fd4..aa22ee54e 100644 --- a/nft_ingester/src/account_updates.rs +++ b/nft_ingester/src/account_updates.rs @@ -1,10 +1,7 @@ -use std::{ - sync::Arc, -}; +use std::sync::Arc; use crate::{ - metric, metrics::capture_result, - program_transformers::ProgramTransformer, tasks::TaskData, + metric, metrics::capture_result, program_transformers::ProgramTransformer, tasks::TaskData, }; use cadence_macros::{is_global_default_set, statsd_count, statsd_time}; use chrono::Utc; diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs index 46aae4b54..b1acae63b 100644 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs @@ -2,15 +2,15 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ save_changelog_event, upsert_asset_with_leaf_info, - upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_collection_info, + upsert_collection_verified, }, }; use blockbuster::{ instruction::InstructionBundle, programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, }; -use digital_asset_types::dao::asset_grouping; -use sea_orm::{entity::*, query::*, sea_query::OnConflict, DbBackend, Set}; +use sea_orm::query::*; pub async fn process<'c, T>( parsing_result: &BubblegumInstruction, @@ -63,40 +63,22 @@ where upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; - if verify { - if let Some(Payload::SetAndVerifyCollection { collection }) = - parsing_result.payload - { - let grouping = asset_grouping::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - group_key: Set("collection".to_string()), - group_value: Set(collection.to_string()), - seq: Set(seq as i64), - slot_updated: Set(bundle.slot as i64), - ..Default::default() - }; - let mut query = asset_grouping::Entity::insert(grouping) - .on_conflict( - OnConflict::columns([ - asset_grouping::Column::AssetId, - asset_grouping::Column::GroupKey, - ]) - .update_columns([ - asset_grouping::Column::GroupKey, - asset_grouping::Column::GroupValue, - asset_grouping::Column::Seq, - asset_grouping::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_grouping.slot_updated AND excluded.seq >= asset_grouping.seq", - query.sql - ); - txn.execute(query).await?; - } + if let Some(Payload::SetAndVerifyCollection { collection }) = parsing_result.payload + { + // Upsert into `asset_grouping` table with base collection info. + upsert_collection_info( + txn, + id_bytes.to_vec(), + collection.to_string(), + bundle.slot as i64, + seq as i64, + ) + .await?; } + + // Partial update with whether collection is verified and the `seq` number. + upsert_collection_verified(txn, id_bytes.to_vec(), verify, seq as i64).await?; + id_bytes } _ => return Err(IngesterError::NotImplemented), diff --git a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs index a8cc73077..b40006401 100644 --- a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs @@ -1,19 +1,15 @@ -use blockbuster::{ - instruction::InstructionBundle, - programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, -}; -use digital_asset_types::dao::asset_creators; -use sea_orm::{ConnectionTrait, Set, TransactionTrait}; - use crate::{ error::IngesterError, program_transformers::bubblegum::{ - update_creator, upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, - upsert_asset_with_seq, + save_changelog_event, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_creator_verified, }, }; - -use super::save_changelog_event; +use blockbuster::{ + instruction::InstructionBundle, + programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, +}; +use sea_orm::{ConnectionTrait, TransactionTrait}; pub async fn process<'c, T>( parsing_result: &BubblegumInstruction, @@ -82,21 +78,12 @@ where _ => return Err(IngesterError::NotImplemented), }; - // The primary key `id` is not required here since `update_creator` uses `update_many` - // for the time being. - let creator_to_update = asset_creators::ActiveModel { - //id: Unchanged(14), - verified: Set(value), - seq: Set(seq as i64), - ..Default::default() - }; - - update_creator( + upsert_creator_verified( txn, asset_id_bytes, creator.to_bytes().to_vec(), - seq, - creator_to_update, + value, + seq as i64, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index 1b5975f23..0ba5e7b06 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -1,5 +1,5 @@ use crate::error::IngesterError; -use digital_asset_types::dao::{asset, asset_creators, backfill_items, cl_items}; +use digital_asset_types::dao::{asset, asset_creators, asset_grouping, backfill_items, cl_items}; use log::{debug, info}; use sea_orm::{ query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait, @@ -264,30 +264,132 @@ where Ok(()) } -pub async fn update_creator( +pub async fn upsert_creator_verified( txn: &T, asset_id: Vec, creator: Vec, - seq: u64, - model: asset_creators::ActiveModel, + verified: bool, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset_creators::ActiveModel { + asset_id: Set(asset_id), + creator: Set(creator), + verified: Set(verified), + seq: Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset_creators::Entity::insert(model) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Creator, + ]) + .update_columns([ + asset_creators::Column::Verified, + asset_creators::Column::Seq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + query.sql = format!("{} WHERE excluded.seq > asset_creators.seq", query.sql); + + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +pub async fn upsert_collection_info( + txn: &T, + asset_id: Vec, + group_value: String, + slot_updated: i64, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset_grouping::ActiveModel { + asset_id: Set(asset_id), + group_key: Set("collection".to_string()), + group_value: Set(Some(group_value)), + slot_updated: Set(Some(slot_updated)), + group_info_seq: Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset_grouping::Entity::insert(model) + .on_conflict( + OnConflict::columns([ + asset_grouping::Column::AssetId, + asset_grouping::Column::GroupKey, + ]) + .update_columns([ + asset_grouping::Column::GroupValue, + asset_grouping::Column::SlotUpdated, + asset_grouping::Column::GroupInfoSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + query.sql = format!( + "{} WHERE excluded.group_info_seq > asset_grouping.group_info_seq OR asset_grouping.group_info_seq IS NULL", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +pub async fn upsert_collection_verified( + txn: &T, + asset_id: Vec, + verified: bool, + seq: i64, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { - // Using `update_many` to avoid having to supply the primary key as well within `model`. - // We still effectively end up updating a single row at most, which is uniquely identified - // by the `(asset_id, creator)` pair. Is there any reason why we should not use - // `update_many` here? - let update = asset_creators::Entity::update_many() - .filter( - Condition::all() - .add(asset_creators::Column::AssetId.eq(asset_id)) - .add(asset_creators::Column::Creator.eq(creator)) - .add(asset_creators::Column::Seq.lte(seq)), + let model = asset_grouping::ActiveModel { + asset_id: Set(asset_id), + group_key: Set("collection".to_string()), + verified: Set(verified), + seq: Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset_grouping::Entity::insert(model) + .on_conflict( + OnConflict::columns([ + asset_grouping::Column::AssetId, + asset_grouping::Column::GroupKey, + ]) + .update_columns([ + asset_grouping::Column::Verified, + asset_grouping::Column::Seq, + ]) + .to_owned(), ) - .set(model); + .build(DbBackend::Postgres); - update.exec(txn).await.map_err(IngesterError::from)?; + query.sql = format!( + "{} WHERE excluded.seq > asset_grouping.seq OR asset_grouping.seq IS NULL", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; Ok(()) } diff --git a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs index 7cf2facbc..13c3d6e85 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs @@ -1,9 +1,9 @@ -use super::save_changelog_event; use crate::{ error::IngesterError, program_transformers::bubblegum::{ - upsert_asset_with_compression_info, upsert_asset_with_leaf_info, - upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, + save_changelog_event, upsert_asset_with_compression_info, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_collection_info, + upsert_collection_verified, }, tasks::{DownloadMetadata, IntoTaskData, TaskData}, }; @@ -18,8 +18,7 @@ use blockbuster::{ use chrono::Utc; use digital_asset_types::{ dao::{ - asset, asset_authority, asset_creators, asset_data, asset_grouping, - asset_v1_account_attachments, + asset, asset_authority, asset_creators, asset_data, asset_v1_account_attachments, sea_orm_active_enums::{ChainMutability, Mutability, OwnerType, RoyaltyTargetType}, }, json::ChainDataV1, @@ -232,43 +231,79 @@ where // Insert into `asset_creators` table. let creators = &metadata.creators; if !creators.is_empty() { - let mut db_creators = Vec::with_capacity(creators.len()); + // Vec to hold base creator information. + let mut db_creator_infos = Vec::with_capacity(creators.len()); + + // Vec to hold info on whether a creator is verified. This info is protected by `seq` number. + let mut db_creator_verified_infos = Vec::with_capacity(creators.len()); + + // Set to prevent duplicates. let mut creators_set = HashSet::new(); + for (i, c) in creators.iter().enumerate() { if creators_set.contains(&c.address) { continue; } - db_creators.push(asset_creators::ActiveModel { + db_creator_infos.push(asset_creators::ActiveModel { asset_id: Set(id_bytes.to_vec()), creator: Set(c.address.to_bytes().to_vec()), + position: Set(i as i16), share: Set(c.share as i32), + slot_updated: Set(Some(slot_i)), + ..Default::default() + }); + + db_creator_verified_infos.push(asset_creators::ActiveModel { + asset_id: Set(id_bytes.to_vec()), + creator: Set(c.address.to_bytes().to_vec()), verified: Set(c.verified), - seq: Set(seq as i64), // do we need this here @micheal-danenberg? - slot_updated: Set(slot_i), - position: Set(i as i16), + seq: Set(Some(seq as i64)), ..Default::default() }); + creators_set.insert(c.address); } - let query = asset_creators::Entity::insert_many(db_creators) + // This statement will update base information for each creator. + let query = asset_creators::Entity::insert_many(db_creator_infos) .on_conflict( OnConflict::columns([ asset_creators::Column::AssetId, - asset_creators::Column::Position, + asset_creators::Column::Creator, ]) .update_columns([ - asset_creators::Column::Creator, + asset_creators::Column::Position, asset_creators::Column::Share, + asset_creators::Column::SlotUpdated, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + txn.execute(query).await?; + + // This statement will update whether the creator is verified and the `seq` + // number. `seq` is used to protect the `verified` field, allowing for `mint` + // and `verifyCreator` to be processed out of order. + let mut query = asset_creators::Entity::insert_many(db_creator_verified_infos) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Creator, + ]) + .update_columns([ asset_creators::Column::Verified, asset_creators::Column::Seq, - asset_creators::Column::SlotUpdated, ]) .to_owned(), ) .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.seq > asset_creators.seq OR asset_creators.seq IS NULL", + query.sql + ); txn.execute(query).await?; } + // Insert into `asset_authority` table. let model = asset_authority::ActiveModel { asset_id: Set(id_bytes.to_vec()), @@ -289,33 +324,22 @@ where .build(DbBackend::Postgres); txn.execute(query).await?; - // Insert into `asset_grouping` table. if let Some(c) = &metadata.collection { - if c.verified { - let model = asset_grouping::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - group_key: Set("collection".to_string()), - group_value: Set(c.key.to_string()), - seq: Set(seq as i64), // gummyroll seq - slot_updated: Set(slot_i), - ..Default::default() - }; + // Upsert into `asset_grouping` table with base collection info. + upsert_collection_info( + txn, + id_bytes.to_vec(), + c.key.to_string(), + slot_i, + seq as i64, + ) + .await?; - // Do not attempt to modify any existing values: - // `ON CONFLICT ('asset_id') DO NOTHING`. - let query = asset_grouping::Entity::insert(model) - .on_conflict( - OnConflict::columns([ - asset_grouping::Column::AssetId, - asset_grouping::Column::GroupKey, - ]) - .do_nothing() - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query).await?; - } + // Partial update with whether collection is verified and the `seq` number. + upsert_collection_verified(txn, id_bytes.to_vec(), c.verified, seq as i64) + .await?; } + let mut task = DownloadMetadata { asset_data_id: id_bytes.to_vec(), uri: metadata.uri.clone(), diff --git a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs index 4ec329dd7..48bf3c37c 100644 --- a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs +++ b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs @@ -317,9 +317,9 @@ pub async fn save_v1_asset( let model = asset_grouping::ActiveModel { asset_id: Set(id.to_vec()), group_key: Set("collection".to_string()), - group_value: Set(c.key.to_string()), - seq: Set(0), - slot_updated: Set(slot_i), + group_value: Set(Some(c.key.to_string())), + seq: Set(Some(0)), + slot_updated: Set(Some(slot_i)), ..Default::default() }; let mut query = asset_grouping::Entity::insert(model) @@ -356,7 +356,7 @@ pub async fn save_v1_asset( ) .all(conn) .await?; - if existing_creators.len() > 0 { + if !existing_creators.is_empty() { let mut db_creators = Vec::with_capacity(creators.len()); for (i, c) in creators.into_iter().enumerate() { if creators_set.contains(&c.address) { @@ -367,8 +367,8 @@ pub async fn save_v1_asset( creator: Set(c.address.to_bytes().to_vec()), share: Set(c.share as i32), verified: Set(c.verified), - seq: Set(0), // do we need this here @micheal-danenberg? - slot_updated: Set(slot_i), + seq: Set(Some(0)), + slot_updated: Set(Some(slot_i)), position: Set(i as i16), ..Default::default() });