Skip to content

Commit

Permalink
Factor out common creator update code to helper function
Browse files Browse the repository at this point in the history
  • Loading branch information
danenbm committed Oct 23, 2023
1 parent 77020fa commit ab1f1b4
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 197 deletions.
94 changes: 93 additions & 1 deletion nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use digital_asset_types::dao::{
sea_orm_active_enums::{ChainMutability, Mutability},
};
use log::{debug, info};
use mpl_bubblegum::types::Collection;
use mpl_bubblegum::types::{Collection, Creator};
use sea_orm::{
query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait,
};
use spl_account_compression::events::ChangeLogEventV1;
use std::collections::HashSet;

pub async fn save_changelog_event<'c, T>(
change_log_event: &ChangeLogEventV1,
Expand Down Expand Up @@ -610,3 +611,94 @@ where

Ok(())
}

pub async fn upsert_creators<T>(
txn: &T,
id: Vec<u8>,
creators: &Vec<Creator>,
slot_updated: i64,
seq: i64,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if creators_should_be_updated(txn, id.clone(), seq).await? {
if !creators.is_empty() {
// Vec to hold base creator information.
let mut db_creator_infos = Vec::with_capacity(creators.len());

// Vec to hold info on whether a creator is verified. This info is protected by `seq` number.
let mut db_creator_verified_infos = Vec::with_capacity(creators.len());

// Set to prevent duplicates.
let mut creators_set = HashSet::new();

for (i, c) in creators.iter().enumerate() {
if creators_set.contains(&c.address) {
continue;
}

db_creator_infos.push(asset_creators::ActiveModel {
asset_id: Set(id.clone()),
creator: Set(c.address.to_bytes().to_vec()),
position: Set(i as i16),
share: Set(c.share as i32),
slot_updated: Set(Some(slot_updated)),
..Default::default()
});

db_creator_verified_infos.push(asset_creators::ActiveModel {
asset_id: Set(id.clone()),
creator: Set(c.address.to_bytes().to_vec()),
verified: Set(c.verified),
verified_seq: Set(Some(seq)),
..Default::default()
});

creators_set.insert(c.address);
}

// This statement will update base information for each creator.
let query = asset_creators::Entity::insert_many(db_creator_infos)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Creator,
])
.update_columns([
asset_creators::Column::Position,
asset_creators::Column::Share,
asset_creators::Column::SlotUpdated,
])
.to_owned(),
)
.build(DbBackend::Postgres);
txn.execute(query).await?;

// This statement will update whether the creator is verified and the
// `verified_seq` number.
let mut query = asset_creators::Entity::insert_many(db_creator_verified_infos)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Creator,
])
.update_columns([
asset_creators::Column::Verified,
asset_creators::Column::VerifiedSeq,
])
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!(
"{} WHERE excluded.verified_seq >= asset_creators.verified_seq OR asset_creators.verified_seq IS NULL",
query.sql
);
txn.execute(query).await?;
}

upsert_asset_with_creators_added_seq(txn, id, seq).await?;
}

Ok(())
}
108 changes: 17 additions & 91 deletions nft_ingester/src/program_transformers/bubblegum/mint_v1.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::{
error::IngesterError,
program_transformers::bubblegum::{
asset_was_decompressed, creators_should_be_updated, save_changelog_event,
upsert_asset_data, upsert_asset_with_compression_info,
upsert_asset_with_creators_added_seq, upsert_asset_with_leaf_info,
asset_was_decompressed, save_changelog_event, upsert_asset_data,
upsert_asset_with_compression_info, upsert_asset_with_leaf_info,
upsert_asset_with_owner_and_delegate_info, upsert_asset_with_royalty_amount,
upsert_asset_with_seq, upsert_collection_info,
upsert_asset_with_seq, upsert_collection_info, upsert_creators,
},
tasks::{DownloadMetadata, IntoTaskData, TaskData},
};
Expand All @@ -20,8 +19,11 @@ use blockbuster::{
use chrono::Utc;
use digital_asset_types::{
dao::{
asset, asset_authority, asset_creators, asset_v1_account_attachments,
sea_orm_active_enums::{ChainMutability, Mutability, OwnerType, RoyaltyTargetType},
asset, asset_authority, asset_v1_account_attachments,
sea_orm_active_enums::{
ChainMutability, Mutability, OwnerType, RoyaltyTargetType, SpecificationAssetClass,
SpecificationVersions, V1AccountAttachments,
},
},
json::ChainDataV1,
};
Expand All @@ -30,11 +32,6 @@ use num_traits::FromPrimitive;
use sea_orm::{
entity::*, query::*, sea_query::OnConflict, ConnectionTrait, DbBackend, EntityTrait, JsonValue,
};
use std::collections::HashSet;

use digital_asset_types::dao::sea_orm_active_enums::{
SpecificationAssetClass, SpecificationVersions, V1AccountAttachments,
};

// TODO -> consider moving structs into these functions to avoid clone

Expand Down Expand Up @@ -219,86 +216,15 @@ where
.await
.map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;

// Insert into `asset_creators` table as long as there wasn't a subsequent `update_metadata`.`
let creators = &metadata.creators;
if !creators.is_empty()
&& creators_should_be_updated(txn, id_bytes.to_vec(), seq as i64).await?
{
// Vec to hold base creator information.
let mut db_creator_infos = Vec::with_capacity(creators.len());

// Vec to hold info on whether a creator is verified. This info is protected by `seq` number.
let mut db_creator_verified_infos = Vec::with_capacity(creators.len());

// Set to prevent duplicates.
let mut creators_set = HashSet::new();

for (i, c) in creators.iter().enumerate() {
if creators_set.contains(&c.address) {
continue;
}

db_creator_infos.push(asset_creators::ActiveModel {
asset_id: Set(id_bytes.to_vec()),
creator: Set(c.address.to_bytes().to_vec()),
position: Set(i as i16),
share: Set(c.share as i32),
slot_updated: Set(Some(slot_i)),
..Default::default()
});

db_creator_verified_infos.push(asset_creators::ActiveModel {
asset_id: Set(id_bytes.to_vec()),
creator: Set(c.address.to_bytes().to_vec()),
verified: Set(c.verified),
verified_seq: Set(Some(seq as i64)),
..Default::default()
});

creators_set.insert(c.address);
}

// This statement will update base information for each creator.
let query = asset_creators::Entity::insert_many(db_creator_infos)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Creator,
])
.update_columns([
asset_creators::Column::Position,
asset_creators::Column::Share,
asset_creators::Column::SlotUpdated,
])
.to_owned(),
)
.build(DbBackend::Postgres);
txn.execute(query).await?;

// This statement will update whether the creator is verified and the
// `verified_seq` number.
let mut query = asset_creators::Entity::insert_many(db_creator_verified_infos)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Creator,
])
.update_columns([
asset_creators::Column::Verified,
asset_creators::Column::VerifiedSeq,
])
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!(
"{} WHERE excluded.verified_seq >= asset_creators.verified_seq OR asset_creators.verified_seq IS NULL",
query.sql
);
txn.execute(query).await?;

upsert_asset_with_creators_added_seq(txn, id_bytes.to_vec(), seq as i64)
.await?;
}
// Upsert into `asset_creators` table.
upsert_creators(
txn,
id_bytes.to_vec(),
&metadata.creators,
slot_i,
seq as i64,
)
.await?;

// Insert into `asset_authority` table.
let model = asset_authority::ActiveModel {
Expand Down
124 changes: 19 additions & 105 deletions nft_ingester/src/program_transformers/bubblegum/update_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{
error::IngesterError,
program_transformers::bubblegum::{
asset_was_decompressed, creators_should_be_updated, save_changelog_event,
upsert_asset_data, upsert_asset_with_creators_added_seq, upsert_asset_with_leaf_info,
upsert_asset_with_royalty_amount, upsert_asset_with_seq,
asset_was_decompressed, save_changelog_event, upsert_asset_data,
upsert_asset_with_leaf_info, upsert_asset_with_royalty_amount, upsert_asset_with_seq,
upsert_creators,
},
tasks::{DownloadMetadata, IntoTaskData, TaskData},
};
Expand All @@ -22,10 +22,7 @@ use digital_asset_types::{
};
use log::warn;
use num_traits::FromPrimitive;
use sea_orm::{
entity::*, query::*, sea_query::OnConflict, ConnectionTrait, DbBackend, EntityTrait, JsonValue,
};
use std::collections::HashSet;
use sea_orm::{entity::*, query::*, ConnectionTrait, EntityTrait, JsonValue};

pub async fn update_metadata<'c, T>(
parsing_result: &BubblegumInstruction,
Expand Down Expand Up @@ -169,105 +166,22 @@ where
upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?;

// Update `asset_creators` table.
if creators_should_be_updated(txn, id_bytes.to_vec(), seq as i64).await? {
if let Some(creators) = &update_args.creators {
// Vec to hold base creator information.
let mut db_creator_infos = Vec::with_capacity(creators.len());

// Vec to hold info on whether a creator is verified. This info is protected by `seq` number.
let mut db_creator_verified_infos = Vec::with_capacity(creators.len());

// Set to prevent duplicates.
let mut creators_set = HashSet::new();

for (i, c) in creators.iter().enumerate() {
if creators_set.contains(&c.address) {
continue;
}

db_creator_infos.push(asset_creators::ActiveModel {
asset_id: Set(id_bytes.to_vec()),
creator: Set(c.address.to_bytes().to_vec()),
position: Set(i as i16),
share: Set(c.share as i32),
slot_updated: Set(Some(slot_i)),
..Default::default()
});

db_creator_verified_infos.push(asset_creators::ActiveModel {
asset_id: Set(id_bytes.to_vec()),
creator: Set(c.address.to_bytes().to_vec()),
verified: Set(c.verified),
verified_seq: Set(Some(seq as i64)),
..Default::default()
});

creators_set.insert(c.address);
}

// Remove creators no longer present in creator array.
let db_creators_to_remove: Vec<Vec<u8>> = current_metadata
.creators
.iter()
.filter(|c| !creators_set.contains(&c.address))
.map(|c| c.address.to_bytes().to_vec())
.collect();

asset_creators::Entity::delete_many()
.filter(
Condition::all()
.add(asset_creators::Column::AssetId.eq(id_bytes.to_vec()))
.add(
asset_creators::Column::Creator
.is_in(db_creators_to_remove),
),
)
.exec(txn)
.await?;

// This statement will update base information for each creator.
let query = asset_creators::Entity::insert_many(db_creator_infos)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Creator,
])
.update_columns([
asset_creators::Column::Position,
asset_creators::Column::Share,
asset_creators::Column::SlotUpdated,
])
.to_owned(),
)
.build(DbBackend::Postgres);
txn.execute(query).await?;

// This statement will update whether the creator is verified and the
// `verified_seq` number.
let mut query =
asset_creators::Entity::insert_many(db_creator_verified_infos)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Creator,
])
.update_columns([
asset_creators::Column::Verified,
asset_creators::Column::VerifiedSeq,
])
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!(
"{} WHERE excluded.verified_seq >= asset_creators.verified_seq OR asset_creators.verified_seq IS NULL",
query.sql
);
txn.execute(query).await?;

upsert_asset_with_creators_added_seq(txn, id_bytes.to_vec(), seq as i64)
.await?;
}
}
// Delete any existing creators.
asset_creators::Entity::delete_many()
.filter(
Condition::all().add(asset_creators::Column::AssetId.eq(id_bytes.to_vec())),
)
.exec(txn)
.await?;

// Upsert into `asset_creators` table.
let creators = if let Some(creators) = &update_args.creators {
creators
} else {
&current_metadata.creators
};
upsert_creators(txn, id_bytes.to_vec(), creators, slot_i, seq as i64).await?;

if uri.is_empty() {
warn!(
Expand Down

0 comments on commit ab1f1b4

Please sign in to comment.