From 3ca1303d31eda8999764d90c4351d1b72b1a547d Mon Sep 17 00:00:00 2001 From: Michael Danenberg <56533526+danenbm@users.noreply.github.com> Date: Wed, 26 Jul 2023 18:16:55 -0700 Subject: [PATCH] Change creator and collection verification to use upserts to support out of order (#87) * Fix docker preparation script to build SPL * Update owner and delegate in asset table when collection or creator verification occurs * Modify program transformers to upsert in asset table * This allows out-of-order Bubblegum transactions to create and update the asset table. * Upsert leaf schema, owner, delegate, and seq separately since those are updated by all instructions and gated by sequence number to ensure freshest value. * Mint, burn, and decompress happen without regard to sequence number because they operate on unique fields. * Mint and burn have been updated but Decompress still needs to be fixed to handle out of order transactions. * Also remove unused 'filling' variable. * Update mint and decompress to be able to upsert asset info out of order * Add second sequence number for compression status fields * Reduce logging in docker * Comment out compressed_seq before regenerating Sea ORM objects * Add migration for asset specification * Update README * Rename PNFT and regenerate Sea ORM types * Apply usage of compressed_seq after regenerating Sea ORM types * Add owner delegate sequence number for owner and delegate fields. Also remove not null constraints for asset fields without defaults. * Regenerating database types * Update handling for non null constrained asset table * Update tests to use new Sea ORM types * Use owner_and_delegate_seq to separate upserts Also update redeem and decompress to not use leaf schema events. * Adding was_decompressed flag to replace compressed_seq compressed_seq won't work because decompression doesn't create a cl_event. * Regenerating Sea ORM types * Update code to use was_decompressed flag * Fix new boolean SQL conditions * Update comment * Remove column updates in asset table during mint for items not in model * Clippy fixes in ingester main * Cleanup debug comment * Allow for sequence number to be NULL (needed after decompress now) * Add leaf specific sequence number to protect that field in asset table * Revert "Allow for sequence number to be NULL (needed after decompress now)" This reverts commit 2713a18ad2ddfac1944a9585774c45706933bf89. * Update nft_ingester/src/program_transformers/bubblegum/redeem.rs Co-authored-by: Nicolas Pennie * Change creator verification to use upserts to support out of order * Remove null constraints on asset_creators table * Add null clause to upsert during mint * Rename creator vecs and add comments * Removing comment * Fix typo in migration down function * Fix collection verification and change to use upserts to support out of order processing (#90) * Adding verified flag to asset_grouping table * Regenerate Sea ORM types * Remove null constraints on asset_grouping table * Regenerate Sea ORM types * Update digital asset types and ingester based on new Sea ORM objects * Setting new verified flag in asset_grouping table to be non null with default Also regenerating Sea ORM types * Separate out collection insert in mintV1 into separate upserts * Fix error message * Separate update collection base info from collection verified * Add group info seq to asset_grouping table * Regenerate Sea ORM types * Add group_info_seq checks to collection base info upsert * Add check for verified = true in grouping for Read API * Fix conditions for asset grouping updates * Require grouping to verified to be returned from API in all cases --------- Co-authored-by: Nicolas Pennie --- das_api/src/api/api_impl.rs | 4 +- das_api/src/api/mod.rs | 2 +- das_api/src/main.rs | 4 +- .../src/dao/generated/asset.rs | 12 +- .../src/dao/generated/asset_creators.rs | 8 +- .../src/dao/generated/asset_grouping.rs | 18 ++- .../src/dao/generated/sea_orm_active_enums.rs | 52 +++---- digital_asset_types/src/dao/scopes/asset.rs | 10 +- digital_asset_types/src/dapi/common/asset.rs | 60 ++++---- digital_asset_types/tests/common.rs | 14 +- migration/src/lib.rs | 8 ++ ..._remove_asset_creators_null_constraints.rs | 44 ++++++ ...0720_120101_add_asset_grouping_verified.rs | 37 +++++ ..._remove_asset_grouping_null_constraints.rs | 46 ++++++ .../m20230724_120101_add_group_info_seq.rs | 36 +++++ nft_ingester/src/account_updates.rs | 7 +- .../bubblegum/collection_verification.rs | 54 +++---- .../bubblegum/creator_verification.rs | 33 ++--- .../src/program_transformers/bubblegum/db.rs | 134 +++++++++++++++--- .../program_transformers/bubblegum/mint_v1.rs | 100 ++++++++----- .../token_metadata/v1_asset.rs | 12 +- 21 files changed, 484 insertions(+), 211 deletions(-) create mode 100644 migration/src/m20230712_120101_remove_asset_creators_null_constraints.rs create mode 100644 migration/src/m20230720_120101_add_asset_grouping_verified.rs create mode 100644 migration/src/m20230720_130101_remove_asset_grouping_null_constraints.rs create mode 100644 migration/src/m20230724_120101_add_group_info_seq.rs diff --git a/das_api/src/api/api_impl.rs b/das_api/src/api/api_impl.rs index 8c981aaaa..ffdfa2d55 100644 --- a/das_api/src/api/api_impl.rs +++ b/das_api/src/api/api_impl.rs @@ -1,5 +1,3 @@ -use std::vec; - use digital_asset_types::{ dao::{ scopes::asset::get_grouping, @@ -356,7 +354,7 @@ impl ApiContract for DasApi { } = payload; let gs = get_grouping(&self.db_connection, group_key.clone(), group_value.clone()).await?; Ok(GetGroupingResponse { - group_key: group_key, + group_key, group_name: group_value, group_size: gs.size, }) diff --git a/das_api/src/api/mod.rs b/das_api/src/api/mod.rs index 34ba2d37e..0c5546e15 100644 --- a/das_api/src/api/mod.rs +++ b/das_api/src/api/mod.rs @@ -1,4 +1,4 @@ -use crate::{DasApiError, RpcModule}; +use crate::DasApiError; use async_trait::async_trait; use digital_asset_types::rpc::filter::SearchConditionType; use digital_asset_types::rpc::response::AssetList; diff --git a/das_api/src/main.rs b/das_api/src/main.rs index 5450072e9..368eddecb 100644 --- a/das_api/src/main.rs +++ b/das_api/src/main.rs @@ -4,7 +4,7 @@ mod config; mod error; mod validation; -use std::time::{Duration, Instant}; +use std::time::Instant; use { crate::api::DasApi, crate::builder::RpcApiBuilder, @@ -17,7 +17,7 @@ use { std::net::UdpSocket, }; -use hyper::{http, Method}; +use hyper::Method; use log::{debug, info}; use tower_http::cors::{Any, CorsLayer}; diff --git a/digital_asset_types/src/dao/generated/asset.rs b/digital_asset_types/src/dao/generated/asset.rs index 642ccf345..0ced69299 100644 --- a/digital_asset_types/src/dao/generated/asset.rs +++ b/digital_asset_types/src/dao/generated/asset.rs @@ -97,8 +97,8 @@ pub enum Relation { AssetData, AssetV1AccountAttachments, AssetCreators, - AssetGrouping, AssetAuthority, + AssetGrouping, } impl ColumnTrait for Column { @@ -148,8 +148,8 @@ impl RelationTrait for Relation { Entity::has_many(super::asset_v1_account_attachments::Entity).into() } Self::AssetCreators => Entity::has_many(super::asset_creators::Entity).into(), - Self::AssetGrouping => Entity::has_many(super::asset_grouping::Entity).into(), Self::AssetAuthority => Entity::has_many(super::asset_authority::Entity).into(), + Self::AssetGrouping => Entity::has_many(super::asset_grouping::Entity).into(), } } } @@ -172,15 +172,15 @@ impl Related for Entity { } } -impl Related for Entity { +impl Related for Entity { fn to() -> RelationDef { - Relation::AssetGrouping.def() + Relation::AssetAuthority.def() } } -impl Related for Entity { +impl Related for Entity { fn to() -> RelationDef { - Relation::AssetAuthority.def() + Relation::AssetGrouping.def() } } diff --git a/digital_asset_types/src/dao/generated/asset_creators.rs b/digital_asset_types/src/dao/generated/asset_creators.rs index 68de902d5..21f34dcf7 100644 --- a/digital_asset_types/src/dao/generated/asset_creators.rs +++ b/digital_asset_types/src/dao/generated/asset_creators.rs @@ -19,8 +19,8 @@ pub struct Model { pub creator: Vec, pub share: i32, pub verified: bool, - pub seq: i64, - pub slot_updated: i64, + pub seq: Option, + pub slot_updated: Option, pub position: i16, } @@ -62,8 +62,8 @@ impl ColumnTrait for Column { Self::Creator => ColumnType::Binary.def(), Self::Share => ColumnType::Integer.def(), Self::Verified => ColumnType::Boolean.def(), - Self::Seq => ColumnType::BigInteger.def(), - Self::SlotUpdated => ColumnType::BigInteger.def(), + Self::Seq => ColumnType::BigInteger.def().null(), + Self::SlotUpdated => ColumnType::BigInteger.def().null(), Self::Position => ColumnType::SmallInteger.def(), } } diff --git a/digital_asset_types/src/dao/generated/asset_grouping.rs b/digital_asset_types/src/dao/generated/asset_grouping.rs index d0c547640..5d5c0e749 100644 --- a/digital_asset_types/src/dao/generated/asset_grouping.rs +++ b/digital_asset_types/src/dao/generated/asset_grouping.rs @@ -17,9 +17,11 @@ pub struct Model { pub id: i64, pub asset_id: Vec, pub group_key: String, - pub group_value: String, - pub seq: i64, - pub slot_updated: i64, + pub group_value: Option, + pub seq: Option, + pub slot_updated: Option, + pub verified: bool, + pub group_info_seq: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] @@ -30,6 +32,8 @@ pub enum Column { GroupValue, Seq, SlotUpdated, + Verified, + GroupInfoSeq, } #[derive(Copy, Clone, Debug, EnumIter, DerivePrimaryKey)] @@ -56,9 +60,11 @@ impl ColumnTrait for Column { Self::Id => ColumnType::BigInteger.def(), Self::AssetId => ColumnType::Binary.def(), Self::GroupKey => ColumnType::Text.def(), - Self::GroupValue => ColumnType::Text.def(), - Self::Seq => ColumnType::BigInteger.def(), - Self::SlotUpdated => ColumnType::BigInteger.def(), + Self::GroupValue => ColumnType::Text.def().null(), + Self::Seq => ColumnType::BigInteger.def().null(), + Self::SlotUpdated => ColumnType::BigInteger.def().null(), + Self::Verified => ColumnType::Boolean.def(), + Self::GroupInfoSeq => ColumnType::BigInteger.def().null(), } } } diff --git a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs index 5fb406acc..2be0283e7 100644 --- a/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs +++ b/digital_asset_types/src/dao/generated/sea_orm_active_enums.rs @@ -14,16 +14,6 @@ pub enum Mutability { Unknown, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "chain_mutability")] -pub enum ChainMutability { - #[sea_orm(string_value = "immutable")] - Immutable, - #[sea_orm(string_value = "mutable")] - Mutable, - #[sea_orm(string_value = "unknown")] - Unknown, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm( rs_type = "String", db_type = "Enum", @@ -54,6 +44,22 @@ pub enum TaskStatus { Success, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm( + rs_type = "String", + db_type = "Enum", + enum_name = "royalty_target_type" +)] +pub enum RoyaltyTargetType { + #[sea_orm(string_value = "creators")] + Creators, + #[sea_orm(string_value = "fanout")] + Fanout, + #[sea_orm(string_value = "single")] + Single, + #[sea_orm(string_value = "unknown")] + Unknown, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm( rs_type = "String", db_type = "Enum", @@ -82,6 +88,16 @@ pub enum SpecificationAssetClass { Unknown, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "chain_mutability")] +pub enum ChainMutability { + #[sea_orm(string_value = "immutable")] + Immutable, + #[sea_orm(string_value = "mutable")] + Mutable, + #[sea_orm(string_value = "unknown")] + Unknown, +} +#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm( rs_type = "String", db_type = "Enum", @@ -98,22 +114,6 @@ pub enum SpecificationVersions { V2, } #[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] -#[sea_orm( - rs_type = "String", - db_type = "Enum", - enum_name = "royalty_target_type" -)] -pub enum RoyaltyTargetType { - #[sea_orm(string_value = "creators")] - Creators, - #[sea_orm(string_value = "fanout")] - Fanout, - #[sea_orm(string_value = "single")] - Single, - #[sea_orm(string_value = "unknown")] - Unknown, -} -#[derive(Debug, Clone, PartialEq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] #[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "owner_type")] pub enum OwnerType { #[sea_orm(string_value = "single")] diff --git a/digital_asset_types/src/dao/scopes/asset.rs b/digital_asset_types/src/dao/scopes/asset.rs index 43a499fae..56ddcd978 100644 --- a/digital_asset_types/src/dao/scopes/asset.rs +++ b/digital_asset_types/src/dao/scopes/asset.rs @@ -65,7 +65,8 @@ pub async fn get_grouping( .filter( Condition::all() .add(asset_grouping::Column::GroupKey.eq(group_key)) - .add(asset_grouping::Column::GroupValue.eq(group_value)), + .add(asset_grouping::Column::GroupValue.eq(group_value)) + .add(asset_grouping::Column::Verified.eq(true)), ) .count(conn) .await?; @@ -83,7 +84,8 @@ pub async fn get_by_grouping( ) -> Result, DbErr> { let condition = asset_grouping::Column::GroupKey .eq(group_key) - .and(asset_grouping::Column::GroupValue.eq(group_value)); + .and(asset_grouping::Column::GroupValue.eq(group_value)) + .and(asset_grouping::Column::Verified.eq(true)); get_by_related_condition( conn, Condition::all() @@ -192,7 +194,7 @@ pub async fn get_related_for_assets( { let id = asset.id.clone(); let fa = FullAsset { - asset: asset, + asset, data: ad.clone(), authorities: vec![], creators: vec![], @@ -228,6 +230,7 @@ pub async fn get_related_for_assets( let grouping = asset_grouping::Entity::find() .filter(asset_grouping::Column::AssetId.is_in(ids.clone())) + .filter(asset_grouping::Column::Verified.eq(true)) .order_by_asc(asset_grouping::Column::AssetId) .all(conn) .await?; @@ -288,6 +291,7 @@ pub async fn get_by_id( .await?; let grouping: Vec = asset_grouping::Entity::find() .filter(asset_grouping::Column::AssetId.eq(asset.id.clone())) + .filter(asset_grouping::Column::Verified.eq(true)) .all(conn) .await?; Ok(FullAsset { diff --git a/digital_asset_types/src/dapi/common/asset.rs b/digital_asset_types/src/dapi/common/asset.rs index 917c2f105..43f01856b 100644 --- a/digital_asset_types/src/dapi/common/asset.rs +++ b/digital_asset_types/src/dapi/common/asset.rs @@ -2,15 +2,14 @@ use crate::dao::sea_orm_active_enums::SpecificationVersions; use crate::dao::FullAsset; use crate::dao::Pagination; use crate::dao::{asset, asset_authority, asset_creators, asset_data, asset_grouping}; - use crate::rpc::filter::{AssetSortBy, AssetSortDirection, AssetSorting}; use crate::rpc::response::{AssetError, AssetList}; use crate::rpc::{ Asset as RpcAsset, Authority, Compression, Content, Creator, File, Group, Interface, MetadataMap, Ownership, Royalty, Scope, Supply, Uses, }; - use jsonpath_lib::JsonPathError; +use log::warn; use mime_guess::Mime; use sea_orm::DbErr; use serde_json::Value; @@ -18,8 +17,6 @@ use std::collections::HashMap; use std::path::Path; use url::Url; -use log::{debug, info, warn}; - pub fn to_uri(uri: String) -> Option { Url::parse(&*uri).ok() } @@ -30,7 +27,9 @@ pub fn get_mime(url: Url) -> Option { pub fn get_mime_type_from_uri(uri: String) -> String { let default_mime_type = "image/png".to_string(); - to_uri(uri).and_then(get_mime).map_or(default_mime_type, |m| m.to_string()) + to_uri(uri) + .and_then(get_mime) + .map_or(default_mime_type, |m| m.to_string()) } pub fn file_from_str(str: String) -> File { @@ -173,28 +172,25 @@ pub fn v1_content_from_json(asset_data: &asset_data::Model) -> Result { if let Some(str_uri) = u.as_str() { - let file = - if let Some(str_mime) = m.as_str() { - File { - uri: Some(str_uri.to_string()), - mime: Some(str_mime.to_string()), - quality: None, - contexts: None, - } - } else { - warn!("Mime is not string: {:?}", m); - file_from_str(str_uri.to_string()) - }; - actual_files.insert( - str_uri.to_string().clone(), - file, - ); + let file = if let Some(str_mime) = m.as_str() { + File { + uri: Some(str_uri.to_string()), + mime: Some(str_mime.to_string()), + quality: None, + contexts: None, + } + } else { + warn!("Mime is not string: {:?}", m); + file_from_str(str_uri.to_string()) + }; + actual_files.insert(str_uri.to_string().clone(), file); } else { warn!("URI is not string: {:?}", u); } } (Some(u), None) => { - let str_uri = serde_json::to_string(u).unwrap_or_else(|_|String::new()); + let str_uri = + serde_json::to_string(u).unwrap_or_else(|_| String::new()); actual_files.insert(str_uri.clone(), file_from_str(str_uri)); } _ => {} @@ -250,14 +246,18 @@ pub fn to_creators(creators: Vec) -> Vec { .collect() } -pub fn to_grouping(groups: Vec) -> Vec { - groups - .iter() - .map(|a| Group { - group_key: a.group_key.clone(), - group_value: a.group_value.clone(), +pub fn to_grouping(groups: Vec) -> Result, DbErr> { + fn find_group(model: &asset_grouping::Model) -> Result { + Ok(Group { + group_key: model.group_key.clone(), + group_value: model + .group_value + .clone() + .ok_or(DbErr::Custom("Group value not found".to_string()))?, }) - .collect() + } + + groups.iter().map(find_group).collect() } pub fn get_interface(asset: &asset::Model) -> Result { @@ -286,7 +286,7 @@ pub fn asset_to_rpc(asset: FullAsset) -> Result { } = asset; let rpc_authorities = to_authority(authorities); let rpc_creators = to_creators(creators); - let rpc_groups = to_grouping(groups); + let rpc_groups = to_grouping(groups)?; let interface = get_interface(&asset)?; let content = get_content(&asset, &data)?; let mut chain_data_selector_fn = jsonpath_lib::selector(&data.chain_data); diff --git a/digital_asset_types/tests/common.rs b/digital_asset_types/tests/common.rs index 9bb28a006..653b61e7b 100644 --- a/digital_asset_types/tests/common.rs +++ b/digital_asset_types/tests/common.rs @@ -179,8 +179,8 @@ pub fn create_asset_creator( creator, share, verified, - seq: 0, - slot_updated: 0, + seq: Some(0), + slot_updated: Some(0), position: 0, }, ) @@ -218,16 +218,18 @@ pub fn create_asset_grouping( asset_grouping::ActiveModel { asset_id: Set(asset_id.clone()), group_key: Set(String::from("collection")), - group_value: Set(bs58::encode(collection).into_string()), + group_value: Set(Some(bs58::encode(collection).into_string())), ..Default::default() }, asset_grouping::Model { asset_id, - group_value: bs58::encode(collection).into_string(), - seq: 0, + group_value: Some(bs58::encode(collection).into_string()), + seq: Some(0), id: row_num, group_key: "collection".to_string(), - slot_updated: 0, + slot_updated: Some(0), + verified: false, + group_info_seq: Some(0), }, ) } diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 6edefc669..11e8b5d0f 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -21,6 +21,10 @@ mod m20230601_120101_add_pnft_enum_val; mod m20230615_120101_remove_asset_null_constraints; mod m20230620_120101_add_was_decompressed; mod m20230623_120101_add_leaf_sequence_number; +mod m20230712_120101_remove_asset_creators_null_constraints; +mod m20230720_120101_add_asset_grouping_verified; +mod m20230720_130101_remove_asset_grouping_null_constraints; +mod m20230724_120101_add_group_info_seq; pub struct Migrator; @@ -49,6 +53,10 @@ impl MigratorTrait for Migrator { Box::new(m20230615_120101_remove_asset_null_constraints::Migration), Box::new(m20230620_120101_add_was_decompressed::Migration), Box::new(m20230623_120101_add_leaf_sequence_number::Migration), + Box::new(m20230712_120101_remove_asset_creators_null_constraints::Migration), + Box::new(m20230720_120101_add_asset_grouping_verified::Migration), + Box::new(m20230720_130101_remove_asset_grouping_null_constraints::Migration), + Box::new(m20230724_120101_add_group_info_seq::Migration), ] } } diff --git a/migration/src/m20230712_120101_remove_asset_creators_null_constraints.rs b/migration/src/m20230712_120101_remove_asset_creators_null_constraints.rs new file mode 100644 index 000000000..73251b0a4 --- /dev/null +++ b/migration/src/m20230712_120101_remove_asset_creators_null_constraints.rs @@ -0,0 +1,44 @@ +use sea_orm_migration::{ + prelude::*, + sea_orm::{ConnectionTrait, DatabaseBackend, Statement}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + ALTER TABLE asset_creators + ALTER COLUMN seq DROP NOT NULL, + ALTER COLUMN slot_updated DROP NOT NULL; + " + .to_string(), + )) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + ALTER TABLE asset_creators + ALTER COLUMN seq SET NOT NULL, + ALTER COLUMN slot_updated SET NOT NULL; + " + .to_string(), + )) + .await?; + + Ok(()) + } +} diff --git a/migration/src/m20230720_120101_add_asset_grouping_verified.rs b/migration/src/m20230720_120101_add_asset_grouping_verified.rs new file mode 100644 index 000000000..8f1cfd4cc --- /dev/null +++ b/migration/src/m20230720_120101_add_asset_grouping_verified.rs @@ -0,0 +1,37 @@ +use digital_asset_types::dao::asset_grouping; +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset_grouping::Entity) + .add_column( + ColumnDef::new(Alias::new("verified")) + .boolean() + .not_null() + .default(false), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset_grouping::Entity) + .drop_column(Alias::new("verified")) + .to_owned(), + ) + .await + } +} diff --git a/migration/src/m20230720_130101_remove_asset_grouping_null_constraints.rs b/migration/src/m20230720_130101_remove_asset_grouping_null_constraints.rs new file mode 100644 index 000000000..debefecf2 --- /dev/null +++ b/migration/src/m20230720_130101_remove_asset_grouping_null_constraints.rs @@ -0,0 +1,46 @@ +use sea_orm_migration::{ + prelude::*, + sea_orm::{ConnectionTrait, DatabaseBackend, Statement}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + ALTER TABLE asset_grouping + ALTER COLUMN group_value DROP NOT NULL, + ALTER COLUMN seq DROP NOT NULL, + ALTER COLUMN slot_updated DROP NOT NULL; + " + .to_string(), + )) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + " + ALTER TABLE asset_grouping + ALTER COLUMN group_value SET NOT NULL, + ALTER COLUMN seq SET NOT NULL, + ALTER COLUMN slot_updated SET NOT NULL; + " + .to_string(), + )) + .await?; + + Ok(()) + } +} diff --git a/migration/src/m20230724_120101_add_group_info_seq.rs b/migration/src/m20230724_120101_add_group_info_seq.rs new file mode 100644 index 000000000..89f2414a5 --- /dev/null +++ b/migration/src/m20230724_120101_add_group_info_seq.rs @@ -0,0 +1,36 @@ +use digital_asset_types::dao::asset_grouping; +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset_grouping::Entity) + .add_column(ColumnDef::new(Alias::new("group_info_seq")).big_integer()) + .to_owned(), + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Replace the sample below with your own migration scripts + manager + .alter_table( + Table::alter() + .table(asset_grouping::Entity) + .drop_column(Alias::new("group_info_seq")) + .to_owned(), + ) + .await?; + + Ok(()) + } +} diff --git a/nft_ingester/src/account_updates.rs b/nft_ingester/src/account_updates.rs index f25933fd4..aa22ee54e 100644 --- a/nft_ingester/src/account_updates.rs +++ b/nft_ingester/src/account_updates.rs @@ -1,10 +1,7 @@ -use std::{ - sync::Arc, -}; +use std::sync::Arc; use crate::{ - metric, metrics::capture_result, - program_transformers::ProgramTransformer, tasks::TaskData, + metric, metrics::capture_result, program_transformers::ProgramTransformer, tasks::TaskData, }; use cadence_macros::{is_global_default_set, statsd_count, statsd_time}; use chrono::Utc; diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs index 46aae4b54..b1acae63b 100644 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs @@ -2,15 +2,15 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ save_changelog_event, upsert_asset_with_leaf_info, - upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_collection_info, + upsert_collection_verified, }, }; use blockbuster::{ instruction::InstructionBundle, programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, }; -use digital_asset_types::dao::asset_grouping; -use sea_orm::{entity::*, query::*, sea_query::OnConflict, DbBackend, Set}; +use sea_orm::query::*; pub async fn process<'c, T>( parsing_result: &BubblegumInstruction, @@ -63,40 +63,22 @@ where upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; - if verify { - if let Some(Payload::SetAndVerifyCollection { collection }) = - parsing_result.payload - { - let grouping = asset_grouping::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - group_key: Set("collection".to_string()), - group_value: Set(collection.to_string()), - seq: Set(seq as i64), - slot_updated: Set(bundle.slot as i64), - ..Default::default() - }; - let mut query = asset_grouping::Entity::insert(grouping) - .on_conflict( - OnConflict::columns([ - asset_grouping::Column::AssetId, - asset_grouping::Column::GroupKey, - ]) - .update_columns([ - asset_grouping::Column::GroupKey, - asset_grouping::Column::GroupValue, - asset_grouping::Column::Seq, - asset_grouping::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_grouping.slot_updated AND excluded.seq >= asset_grouping.seq", - query.sql - ); - txn.execute(query).await?; - } + if let Some(Payload::SetAndVerifyCollection { collection }) = parsing_result.payload + { + // Upsert into `asset_grouping` table with base collection info. + upsert_collection_info( + txn, + id_bytes.to_vec(), + collection.to_string(), + bundle.slot as i64, + seq as i64, + ) + .await?; } + + // Partial update with whether collection is verified and the `seq` number. + upsert_collection_verified(txn, id_bytes.to_vec(), verify, seq as i64).await?; + id_bytes } _ => return Err(IngesterError::NotImplemented), diff --git a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs index a8cc73077..b40006401 100644 --- a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs @@ -1,19 +1,15 @@ -use blockbuster::{ - instruction::InstructionBundle, - programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, -}; -use digital_asset_types::dao::asset_creators; -use sea_orm::{ConnectionTrait, Set, TransactionTrait}; - use crate::{ error::IngesterError, program_transformers::bubblegum::{ - update_creator, upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, - upsert_asset_with_seq, + save_changelog_event, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_creator_verified, }, }; - -use super::save_changelog_event; +use blockbuster::{ + instruction::InstructionBundle, + programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, +}; +use sea_orm::{ConnectionTrait, TransactionTrait}; pub async fn process<'c, T>( parsing_result: &BubblegumInstruction, @@ -82,21 +78,12 @@ where _ => return Err(IngesterError::NotImplemented), }; - // The primary key `id` is not required here since `update_creator` uses `update_many` - // for the time being. - let creator_to_update = asset_creators::ActiveModel { - //id: Unchanged(14), - verified: Set(value), - seq: Set(seq as i64), - ..Default::default() - }; - - update_creator( + upsert_creator_verified( txn, asset_id_bytes, creator.to_bytes().to_vec(), - seq, - creator_to_update, + value, + seq as i64, ) .await?; diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index 1b5975f23..0ba5e7b06 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -1,5 +1,5 @@ use crate::error::IngesterError; -use digital_asset_types::dao::{asset, asset_creators, backfill_items, cl_items}; +use digital_asset_types::dao::{asset, asset_creators, asset_grouping, backfill_items, cl_items}; use log::{debug, info}; use sea_orm::{ query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait, @@ -264,30 +264,132 @@ where Ok(()) } -pub async fn update_creator( +pub async fn upsert_creator_verified( txn: &T, asset_id: Vec, creator: Vec, - seq: u64, - model: asset_creators::ActiveModel, + verified: bool, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset_creators::ActiveModel { + asset_id: Set(asset_id), + creator: Set(creator), + verified: Set(verified), + seq: Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset_creators::Entity::insert(model) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Creator, + ]) + .update_columns([ + asset_creators::Column::Verified, + asset_creators::Column::Seq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + query.sql = format!("{} WHERE excluded.seq > asset_creators.seq", query.sql); + + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +pub async fn upsert_collection_info( + txn: &T, + asset_id: Vec, + group_value: String, + slot_updated: i64, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + let model = asset_grouping::ActiveModel { + asset_id: Set(asset_id), + group_key: Set("collection".to_string()), + group_value: Set(Some(group_value)), + slot_updated: Set(Some(slot_updated)), + group_info_seq: Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset_grouping::Entity::insert(model) + .on_conflict( + OnConflict::columns([ + asset_grouping::Column::AssetId, + asset_grouping::Column::GroupKey, + ]) + .update_columns([ + asset_grouping::Column::GroupValue, + asset_grouping::Column::SlotUpdated, + asset_grouping::Column::GroupInfoSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + + query.sql = format!( + "{} WHERE excluded.group_info_seq > asset_grouping.group_info_seq OR asset_grouping.group_info_seq IS NULL", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; + + Ok(()) +} + +pub async fn upsert_collection_verified( + txn: &T, + asset_id: Vec, + verified: bool, + seq: i64, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { - // Using `update_many` to avoid having to supply the primary key as well within `model`. - // We still effectively end up updating a single row at most, which is uniquely identified - // by the `(asset_id, creator)` pair. Is there any reason why we should not use - // `update_many` here? - let update = asset_creators::Entity::update_many() - .filter( - Condition::all() - .add(asset_creators::Column::AssetId.eq(asset_id)) - .add(asset_creators::Column::Creator.eq(creator)) - .add(asset_creators::Column::Seq.lte(seq)), + let model = asset_grouping::ActiveModel { + asset_id: Set(asset_id), + group_key: Set("collection".to_string()), + verified: Set(verified), + seq: Set(Some(seq)), + ..Default::default() + }; + + let mut query = asset_grouping::Entity::insert(model) + .on_conflict( + OnConflict::columns([ + asset_grouping::Column::AssetId, + asset_grouping::Column::GroupKey, + ]) + .update_columns([ + asset_grouping::Column::Verified, + asset_grouping::Column::Seq, + ]) + .to_owned(), ) - .set(model); + .build(DbBackend::Postgres); - update.exec(txn).await.map_err(IngesterError::from)?; + query.sql = format!( + "{} WHERE excluded.seq > asset_grouping.seq OR asset_grouping.seq IS NULL", + query.sql + ); + + txn.execute(query) + .await + .map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?; Ok(()) } diff --git a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs index 7cf2facbc..13c3d6e85 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs @@ -1,9 +1,9 @@ -use super::save_changelog_event; use crate::{ error::IngesterError, program_transformers::bubblegum::{ - upsert_asset_with_compression_info, upsert_asset_with_leaf_info, - upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, + save_changelog_event, upsert_asset_with_compression_info, upsert_asset_with_leaf_info, + upsert_asset_with_owner_and_delegate_info, upsert_asset_with_seq, upsert_collection_info, + upsert_collection_verified, }, tasks::{DownloadMetadata, IntoTaskData, TaskData}, }; @@ -18,8 +18,7 @@ use blockbuster::{ use chrono::Utc; use digital_asset_types::{ dao::{ - asset, asset_authority, asset_creators, asset_data, asset_grouping, - asset_v1_account_attachments, + asset, asset_authority, asset_creators, asset_data, asset_v1_account_attachments, sea_orm_active_enums::{ChainMutability, Mutability, OwnerType, RoyaltyTargetType}, }, json::ChainDataV1, @@ -232,43 +231,79 @@ where // Insert into `asset_creators` table. let creators = &metadata.creators; if !creators.is_empty() { - let mut db_creators = Vec::with_capacity(creators.len()); + // Vec to hold base creator information. + let mut db_creator_infos = Vec::with_capacity(creators.len()); + + // Vec to hold info on whether a creator is verified. This info is protected by `seq` number. + let mut db_creator_verified_infos = Vec::with_capacity(creators.len()); + + // Set to prevent duplicates. let mut creators_set = HashSet::new(); + for (i, c) in creators.iter().enumerate() { if creators_set.contains(&c.address) { continue; } - db_creators.push(asset_creators::ActiveModel { + db_creator_infos.push(asset_creators::ActiveModel { asset_id: Set(id_bytes.to_vec()), creator: Set(c.address.to_bytes().to_vec()), + position: Set(i as i16), share: Set(c.share as i32), + slot_updated: Set(Some(slot_i)), + ..Default::default() + }); + + db_creator_verified_infos.push(asset_creators::ActiveModel { + asset_id: Set(id_bytes.to_vec()), + creator: Set(c.address.to_bytes().to_vec()), verified: Set(c.verified), - seq: Set(seq as i64), // do we need this here @micheal-danenberg? - slot_updated: Set(slot_i), - position: Set(i as i16), + seq: Set(Some(seq as i64)), ..Default::default() }); + creators_set.insert(c.address); } - let query = asset_creators::Entity::insert_many(db_creators) + // This statement will update base information for each creator. + let query = asset_creators::Entity::insert_many(db_creator_infos) .on_conflict( OnConflict::columns([ asset_creators::Column::AssetId, - asset_creators::Column::Position, + asset_creators::Column::Creator, ]) .update_columns([ - asset_creators::Column::Creator, + asset_creators::Column::Position, asset_creators::Column::Share, + asset_creators::Column::SlotUpdated, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + txn.execute(query).await?; + + // This statement will update whether the creator is verified and the `seq` + // number. `seq` is used to protect the `verified` field, allowing for `mint` + // and `verifyCreator` to be processed out of order. + let mut query = asset_creators::Entity::insert_many(db_creator_verified_infos) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Creator, + ]) + .update_columns([ asset_creators::Column::Verified, asset_creators::Column::Seq, - asset_creators::Column::SlotUpdated, ]) .to_owned(), ) .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.seq > asset_creators.seq OR asset_creators.seq IS NULL", + query.sql + ); txn.execute(query).await?; } + // Insert into `asset_authority` table. let model = asset_authority::ActiveModel { asset_id: Set(id_bytes.to_vec()), @@ -289,33 +324,22 @@ where .build(DbBackend::Postgres); txn.execute(query).await?; - // Insert into `asset_grouping` table. if let Some(c) = &metadata.collection { - if c.verified { - let model = asset_grouping::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - group_key: Set("collection".to_string()), - group_value: Set(c.key.to_string()), - seq: Set(seq as i64), // gummyroll seq - slot_updated: Set(slot_i), - ..Default::default() - }; + // Upsert into `asset_grouping` table with base collection info. + upsert_collection_info( + txn, + id_bytes.to_vec(), + c.key.to_string(), + slot_i, + seq as i64, + ) + .await?; - // Do not attempt to modify any existing values: - // `ON CONFLICT ('asset_id') DO NOTHING`. - let query = asset_grouping::Entity::insert(model) - .on_conflict( - OnConflict::columns([ - asset_grouping::Column::AssetId, - asset_grouping::Column::GroupKey, - ]) - .do_nothing() - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query).await?; - } + // Partial update with whether collection is verified and the `seq` number. + upsert_collection_verified(txn, id_bytes.to_vec(), c.verified, seq as i64) + .await?; } + let mut task = DownloadMetadata { asset_data_id: id_bytes.to_vec(), uri: metadata.uri.clone(), diff --git a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs index 4ec329dd7..48bf3c37c 100644 --- a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs +++ b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs @@ -317,9 +317,9 @@ pub async fn save_v1_asset( let model = asset_grouping::ActiveModel { asset_id: Set(id.to_vec()), group_key: Set("collection".to_string()), - group_value: Set(c.key.to_string()), - seq: Set(0), - slot_updated: Set(slot_i), + group_value: Set(Some(c.key.to_string())), + seq: Set(Some(0)), + slot_updated: Set(Some(slot_i)), ..Default::default() }; let mut query = asset_grouping::Entity::insert(model) @@ -356,7 +356,7 @@ pub async fn save_v1_asset( ) .all(conn) .await?; - if existing_creators.len() > 0 { + if !existing_creators.is_empty() { let mut db_creators = Vec::with_capacity(creators.len()); for (i, c) in creators.into_iter().enumerate() { if creators_set.contains(&c.address) { @@ -367,8 +367,8 @@ pub async fn save_v1_asset( creator: Set(c.address.to_bytes().to_vec()), share: Set(c.share as i32), verified: Set(c.verified), - seq: Set(0), // do we need this here @micheal-danenberg? - slot_updated: Set(slot_i), + seq: Set(Some(0)), + slot_updated: Set(Some(slot_i)), position: Set(i as i16), ..Default::default() });