diff --git a/Cargo.lock b/Cargo.lock index a9028d929..7fe2160d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1840,8 +1840,8 @@ version = "0.7.2" dependencies = [ "async-trait", "blockbuster", - "borsh 0.9.3", - "borsh-derive 0.9.3", + "borsh 0.10.3", + "borsh-derive 0.10.3", "bs58 0.4.0", "futures", "indexmap 1.9.3", diff --git a/Cargo.toml b/Cargo.toml index 75f64a41f..376aff9ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ async-trait = "0.1.60" base64 = "0.21.0" blockbuster = "=0.9.0-beta.5" borsh = "~0.10.3" +borsh-derive = "~0.10.3" bs58 = "0.4.0" cadence = "0.29.0" cadence-macros = "0.29.0" diff --git a/digital_asset_types/Cargo.toml b/digital_asset_types/Cargo.toml index 314073d9f..f7cab08ee 100644 --- a/digital_asset_types/Cargo.toml +++ b/digital_asset_types/Cargo.toml @@ -8,8 +8,8 @@ publish = { workspace = true } [dependencies] async-trait = { workspace = true } blockbuster = { workspace = true } -borsh = { version = "0.9.3", optional = true } -borsh-derive = { version = "0.9.3", optional = true } +borsh = { workspace = true, optional = true } +borsh-derive = { workspace = true, optional = true } bs58 = { workspace = true } futures = { workspace = true } indexmap = { workspace = true } diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 17c482976..f3b1752b6 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -35,6 +35,7 @@ mod m20231206_120101_remove_was_decompressed; mod m20240104_203133_add_cl_audits_v2; mod m20240104_203328_remove_cl_audits; mod m20240116_130744_add_update_metadata_ix; +mod m20240117_120101_alter_creator_indices; pub mod model; @@ -79,6 +80,7 @@ impl MigratorTrait for Migrator { Box::new(m20240104_203133_add_cl_audits_v2::Migration), Box::new(m20240104_203328_remove_cl_audits::Migration), Box::new(m20240116_130744_add_update_metadata_ix::Migration), + Box::new(m20240117_120101_alter_creator_indices::Migration), ] } } diff --git a/migration/src/m20240117_120101_alter_creator_indices.rs b/migration/src/m20240117_120101_alter_creator_indices.rs new file mode 100644 index 000000000..72c0e7101 --- /dev/null +++ b/migration/src/m20240117_120101_alter_creator_indices.rs @@ -0,0 +1,81 @@ +use sea_orm_migration::prelude::*; + +use crate::model::table::AssetCreators; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // New index to serve as a replacement for "asset_creator_unique" when someone queries GetAssetsByCreator. + // We add it before deleting the "asset_creator_unique" index to ensure no customer impact. + manager + .create_index( + Index::create() + .name("asset_creator_verified") + .table(AssetCreators::Table) + .col(AssetCreators::AssetId) + .col(AssetCreators::Creator) + .col(AssetCreators::Verified) + .to_owned(), + ) + .await?; + + // We no longer want to enforce uniques on the (asset_id, creator) pairs. + // We may end up with duplicate (asset_id, creator) pairs during indexing, because a creator can change position. + // Any stale rows (older seq/slot_updated) will be ignored, meaning the API users will never see duplicate creators. + manager + .drop_index( + sea_query::Index::drop() + .name("asset_creator_unique") + .table(AssetCreators::Table) + .to_owned(), + ) + .await?; + + // This index is unused and can be removed. + manager + .drop_index( + sea_query::Index::drop() + .name("asset_verified_creator") + .table(AssetCreators::Table) + .to_owned(), + ) + .await?; + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_index( + Index::create() + .unique() + .name("asset_creator_unique") + .table(AssetCreators::Table) + .col(AssetCreators::AssetId) + .col(AssetCreators::Creator) + .to_owned(), + ) + .await?; + manager + .create_index( + Index::create() + .name("asset_verified_creator") + .table(AssetCreators::Table) + .col(AssetCreators::AssetId) + .col(AssetCreators::Verified) + .to_owned(), + ) + .await?; + manager + .drop_index( + sea_query::Index::drop() + .name("asset_creator_verified") + .table(AssetCreators::Table) + .to_owned(), + ) + .await?; + Ok(()) + } +} diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index b3b1070bd..7e5aafc83 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -532,13 +532,10 @@ pub async fn upsert_asset_creators( where T: ConnectionTrait + TransactionTrait, { - // Vec to hold base creator information. - let mut db_creators = Vec::with_capacity(creators.len()); - - if creators.is_empty() { - // Bubblegum supports empty creator array. In this case insert an empty Vec - // for the creator. - db_creators.push(asset_creators::ActiveModel { + let db_creators = if creators.is_empty() { + // If creators are empty, insert an empty creator with the current sequence. + // This prevents accidental errors during out-of-order updates. + vec![asset_creators::ActiveModel { asset_id: Set(id.clone()), position: Set(0), creator: Set(vec![]), @@ -547,17 +544,12 @@ where slot_updated: Set(Some(slot_updated)), seq: Set(Some(seq)), ..Default::default() - }); + }] } else { - // Set to prevent duplicates. - let mut creators_set = HashSet::new(); - - for (i, c) in creators.iter().enumerate() { - if creators_set.contains(&c.address) { - continue; - } - - db_creators.push(asset_creators::ActiveModel { + creators + .iter() + .enumerate() + .map(|(i, c)| asset_creators::ActiveModel { asset_id: Set(id.clone()), position: Set(i as i16), creator: Set(c.address.to_bytes().to_vec()), @@ -566,11 +558,9 @@ where slot_updated: Set(Some(slot_updated)), seq: Set(Some(seq)), ..Default::default() - }); - - creators_set.insert(c.address); - } - } + }) + .collect() + }; // This statement will update base information for each creator. let mut query = asset_creators::Entity::insert_many(db_creators) @@ -583,8 +573,8 @@ where asset_creators::Column::Creator, asset_creators::Column::Share, asset_creators::Column::Verified, - asset_creators::Column::SlotUpdated, asset_creators::Column::Seq, + asset_creators::Column::SlotUpdated, ]) .to_owned(), ) diff --git a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs index 5c0f1e9c9..8533c8949 100644 --- a/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs +++ b/nft_ingester/src/program_transformers/token_metadata/v1_asset.rs @@ -380,52 +380,47 @@ pub async fn save_v1_asset( .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; } - let creators = data.creators.unwrap_or_default(); - if !creators.is_empty() { - let mut creators_set = HashSet::new(); - let mut db_creators = Vec::with_capacity(creators.len()); - for (i, c) in creators.into_iter().enumerate() { - if creators_set.contains(&c.address) { - continue; - } - db_creators.push(asset_creators::ActiveModel { - asset_id: Set(mint_pubkey_vec.clone()), - position: Set(i as i16), - creator: Set(c.address.to_bytes().to_vec()), - share: Set(c.share as i32), - verified: Set(c.verified), - slot_updated: Set(Some(slot_i)), - seq: Set(Some(0)), - ..Default::default() - }); - creators_set.insert(c.address); - } + let creators = data + .creators + .unwrap_or_default() + .iter() + .enumerate() + .map(|(i, creator)| asset_creators::ActiveModel { + asset_id: Set(mint_pubkey_vec.clone()), + position: Set(i as i16), + creator: Set(creator.address.to_bytes().to_vec()), + share: Set(creator.share as i32), + verified: Set(creator.verified), + slot_updated: Set(Some(slot_i)), + seq: Set(Some(0)), + ..Default::default() + }) + .collect::>(); - if !db_creators.is_empty() { - let mut query = asset_creators::Entity::insert_many(db_creators) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Position, - ]) - .update_columns([ - asset_creators::Column::Creator, - asset_creators::Column::Share, - asset_creators::Column::Verified, - asset_creators::Column::Seq, - asset_creators::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.slot_updated > asset_creators.slot_updated", + if !creators.is_empty() { + let mut query = asset_creators::Entity::insert_many(creators) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Position, + ]) + .update_columns([ + asset_creators::Column::Creator, + asset_creators::Column::Share, + asset_creators::Column::Verified, + asset_creators::Column::Seq, + asset_creators::Column::SlotUpdated, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.slot_updated >= asset_creators.slot_updated OR asset_creators.slot_updated is NULL", query.sql ); - txn.execute(query) - .await - .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; - } + txn.execute(query) + .await + .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; } txn.commit().await?;