From ab1f1b49dab7fc9cc0660fc572d29cd6fc7de434 Mon Sep 17 00:00:00 2001 From: Michael Danenberg <56533526+danenbm@users.noreply.github.com> Date: Mon, 23 Oct 2023 12:33:29 -0700 Subject: [PATCH] Factor out common creator update code to helper function --- .../src/program_transformers/bubblegum/db.rs | 94 ++++++++++++- .../program_transformers/bubblegum/mint_v1.rs | 108 +++------------ .../bubblegum/update_metadata.rs | 124 +++--------------- 3 files changed, 129 insertions(+), 197 deletions(-) diff --git a/nft_ingester/src/program_transformers/bubblegum/db.rs b/nft_ingester/src/program_transformers/bubblegum/db.rs index 2738bcb7f..e788526db 100644 --- a/nft_ingester/src/program_transformers/bubblegum/db.rs +++ b/nft_ingester/src/program_transformers/bubblegum/db.rs @@ -4,11 +4,12 @@ use digital_asset_types::dao::{ sea_orm_active_enums::{ChainMutability, Mutability}, }; use log::{debug, info}; -use mpl_bubblegum::types::Collection; +use mpl_bubblegum::types::{Collection, Creator}; use sea_orm::{ query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait, }; use spl_account_compression::events::ChangeLogEventV1; +use std::collections::HashSet; pub async fn save_changelog_event<'c, T>( change_log_event: &ChangeLogEventV1, @@ -610,3 +611,94 @@ where Ok(()) } + +pub async fn upsert_creators( + txn: &T, + id: Vec, + creators: &Vec, + slot_updated: i64, + seq: i64, +) -> Result<(), IngesterError> +where + T: ConnectionTrait + TransactionTrait, +{ + if creators_should_be_updated(txn, id.clone(), seq).await? { + if !creators.is_empty() { + // Vec to hold base creator information. + let mut db_creator_infos = Vec::with_capacity(creators.len()); + + // Vec to hold info on whether a creator is verified. This info is protected by `seq` number. + let mut db_creator_verified_infos = Vec::with_capacity(creators.len()); + + // Set to prevent duplicates. + let mut creators_set = HashSet::new(); + + for (i, c) in creators.iter().enumerate() { + if creators_set.contains(&c.address) { + continue; + } + + db_creator_infos.push(asset_creators::ActiveModel { + asset_id: Set(id.clone()), + creator: Set(c.address.to_bytes().to_vec()), + position: Set(i as i16), + share: Set(c.share as i32), + slot_updated: Set(Some(slot_updated)), + ..Default::default() + }); + + db_creator_verified_infos.push(asset_creators::ActiveModel { + asset_id: Set(id.clone()), + creator: Set(c.address.to_bytes().to_vec()), + verified: Set(c.verified), + verified_seq: Set(Some(seq)), + ..Default::default() + }); + + creators_set.insert(c.address); + } + + // This statement will update base information for each creator. + let query = asset_creators::Entity::insert_many(db_creator_infos) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Creator, + ]) + .update_columns([ + asset_creators::Column::Position, + asset_creators::Column::Share, + asset_creators::Column::SlotUpdated, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + txn.execute(query).await?; + + // This statement will update whether the creator is verified and the + // `verified_seq` number. + let mut query = asset_creators::Entity::insert_many(db_creator_verified_infos) + .on_conflict( + OnConflict::columns([ + asset_creators::Column::AssetId, + asset_creators::Column::Creator, + ]) + .update_columns([ + asset_creators::Column::Verified, + asset_creators::Column::VerifiedSeq, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( + "{} WHERE excluded.verified_seq >= asset_creators.verified_seq OR asset_creators.verified_seq IS NULL", + query.sql + ); + txn.execute(query).await?; + } + + upsert_asset_with_creators_added_seq(txn, id, seq).await?; + } + + Ok(()) +} diff --git a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs index 8ba918e52..6f0a22275 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs @@ -1,11 +1,10 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - asset_was_decompressed, creators_should_be_updated, save_changelog_event, - upsert_asset_data, upsert_asset_with_compression_info, - upsert_asset_with_creators_added_seq, upsert_asset_with_leaf_info, + asset_was_decompressed, save_changelog_event, upsert_asset_data, + upsert_asset_with_compression_info, upsert_asset_with_leaf_info, upsert_asset_with_owner_and_delegate_info, upsert_asset_with_royalty_amount, - upsert_asset_with_seq, upsert_collection_info, + upsert_asset_with_seq, upsert_collection_info, upsert_creators, }, tasks::{DownloadMetadata, IntoTaskData, TaskData}, }; @@ -20,8 +19,11 @@ use blockbuster::{ use chrono::Utc; use digital_asset_types::{ dao::{ - asset, asset_authority, asset_creators, asset_v1_account_attachments, - sea_orm_active_enums::{ChainMutability, Mutability, OwnerType, RoyaltyTargetType}, + asset, asset_authority, asset_v1_account_attachments, + sea_orm_active_enums::{ + ChainMutability, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass, + SpecificationVersions, V1AccountAttachments, + }, }, json::ChainDataV1, }; @@ -30,11 +32,6 @@ use num_traits::FromPrimitive; use sea_orm::{ entity::*, query::*, sea_query::OnConflict, ConnectionTrait, DbBackend, EntityTrait, JsonValue, }; -use std::collections::HashSet; - -use digital_asset_types::dao::sea_orm_active_enums::{ - SpecificationAssetClass, SpecificationVersions, V1AccountAttachments, -}; // TODO -> consider moving structs into these functions to avoid clone @@ -219,86 +216,15 @@ where .await .map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?; - // Insert into `asset_creators` table as long as there wasn't a subsequent `update_metadata`.` - let creators = &metadata.creators; - if !creators.is_empty() - && creators_should_be_updated(txn, id_bytes.to_vec(), seq as i64).await? - { - // Vec to hold base creator information. - let mut db_creator_infos = Vec::with_capacity(creators.len()); - - // Vec to hold info on whether a creator is verified. This info is protected by `seq` number. - let mut db_creator_verified_infos = Vec::with_capacity(creators.len()); - - // Set to prevent duplicates. - let mut creators_set = HashSet::new(); - - for (i, c) in creators.iter().enumerate() { - if creators_set.contains(&c.address) { - continue; - } - - db_creator_infos.push(asset_creators::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - creator: Set(c.address.to_bytes().to_vec()), - position: Set(i as i16), - share: Set(c.share as i32), - slot_updated: Set(Some(slot_i)), - ..Default::default() - }); - - db_creator_verified_infos.push(asset_creators::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - creator: Set(c.address.to_bytes().to_vec()), - verified: Set(c.verified), - verified_seq: Set(Some(seq as i64)), - ..Default::default() - }); - - creators_set.insert(c.address); - } - - // This statement will update base information for each creator. - let query = asset_creators::Entity::insert_many(db_creator_infos) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Creator, - ]) - .update_columns([ - asset_creators::Column::Position, - asset_creators::Column::Share, - asset_creators::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query).await?; - - // This statement will update whether the creator is verified and the - // `verified_seq` number. - let mut query = asset_creators::Entity::insert_many(db_creator_verified_infos) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Creator, - ]) - .update_columns([ - asset_creators::Column::Verified, - asset_creators::Column::VerifiedSeq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.verified_seq >= asset_creators.verified_seq OR asset_creators.verified_seq IS NULL", - query.sql - ); - txn.execute(query).await?; - - upsert_asset_with_creators_added_seq(txn, id_bytes.to_vec(), seq as i64) - .await?; - } + // Upsert into `asset_creators` table. + upsert_creators( + txn, + id_bytes.to_vec(), + &metadata.creators, + slot_i, + seq as i64, + ) + .await?; // Insert into `asset_authority` table. let model = asset_authority::ActiveModel { diff --git a/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs index 1b08f20e4..835f9cd0c 100644 --- a/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs +++ b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs @@ -1,9 +1,9 @@ use crate::{ error::IngesterError, program_transformers::bubblegum::{ - asset_was_decompressed, creators_should_be_updated, save_changelog_event, - upsert_asset_data, upsert_asset_with_creators_added_seq, upsert_asset_with_leaf_info, - upsert_asset_with_royalty_amount, upsert_asset_with_seq, + asset_was_decompressed, save_changelog_event, upsert_asset_data, + upsert_asset_with_leaf_info, upsert_asset_with_royalty_amount, upsert_asset_with_seq, + upsert_creators, }, tasks::{DownloadMetadata, IntoTaskData, TaskData}, }; @@ -22,10 +22,7 @@ use digital_asset_types::{ }; use log::warn; use num_traits::FromPrimitive; -use sea_orm::{ - entity::*, query::*, sea_query::OnConflict, ConnectionTrait, DbBackend, EntityTrait, JsonValue, -}; -use std::collections::HashSet; +use sea_orm::{entity::*, query::*, ConnectionTrait, EntityTrait, JsonValue}; pub async fn update_metadata<'c, T>( parsing_result: &BubblegumInstruction, @@ -169,105 +166,22 @@ where upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; // Update `asset_creators` table. - if creators_should_be_updated(txn, id_bytes.to_vec(), seq as i64).await? { - if let Some(creators) = &update_args.creators { - // Vec to hold base creator information. - let mut db_creator_infos = Vec::with_capacity(creators.len()); - - // Vec to hold info on whether a creator is verified. This info is protected by `seq` number. - let mut db_creator_verified_infos = Vec::with_capacity(creators.len()); - - // Set to prevent duplicates. - let mut creators_set = HashSet::new(); - - for (i, c) in creators.iter().enumerate() { - if creators_set.contains(&c.address) { - continue; - } - - db_creator_infos.push(asset_creators::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - creator: Set(c.address.to_bytes().to_vec()), - position: Set(i as i16), - share: Set(c.share as i32), - slot_updated: Set(Some(slot_i)), - ..Default::default() - }); - - db_creator_verified_infos.push(asset_creators::ActiveModel { - asset_id: Set(id_bytes.to_vec()), - creator: Set(c.address.to_bytes().to_vec()), - verified: Set(c.verified), - verified_seq: Set(Some(seq as i64)), - ..Default::default() - }); - - creators_set.insert(c.address); - } - // Remove creators no longer present in creator array. - let db_creators_to_remove: Vec> = current_metadata - .creators - .iter() - .filter(|c| !creators_set.contains(&c.address)) - .map(|c| c.address.to_bytes().to_vec()) - .collect(); - - asset_creators::Entity::delete_many() - .filter( - Condition::all() - .add(asset_creators::Column::AssetId.eq(id_bytes.to_vec())) - .add( - asset_creators::Column::Creator - .is_in(db_creators_to_remove), - ), - ) - .exec(txn) - .await?; - - // This statement will update base information for each creator. - let query = asset_creators::Entity::insert_many(db_creator_infos) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Creator, - ]) - .update_columns([ - asset_creators::Column::Position, - asset_creators::Column::Share, - asset_creators::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - txn.execute(query).await?; - - // This statement will update whether the creator is verified and the - // `verified_seq` number. - let mut query = - asset_creators::Entity::insert_many(db_creator_verified_infos) - .on_conflict( - OnConflict::columns([ - asset_creators::Column::AssetId, - asset_creators::Column::Creator, - ]) - .update_columns([ - asset_creators::Column::Verified, - asset_creators::Column::VerifiedSeq, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( - "{} WHERE excluded.verified_seq >= asset_creators.verified_seq OR asset_creators.verified_seq IS NULL", - query.sql - ); - txn.execute(query).await?; - - upsert_asset_with_creators_added_seq(txn, id_bytes.to_vec(), seq as i64) - .await?; - } - } + // Delete any existing creators. + asset_creators::Entity::delete_many() + .filter( + Condition::all().add(asset_creators::Column::AssetId.eq(id_bytes.to_vec())), + ) + .exec(txn) + .await?; + + // Upsert into `asset_creators` table. + let creators = if let Some(creators) = &update_args.creators { + creators + } else { + ¤t_metadata.creators + }; + upsert_creators(txn, id_bytes.to_vec(), creators, slot_i, seq as i64).await?; if uri.is_empty() { warn!(