Skip to content

Commit

Permalink
Fix creators re-ordering bug (metaplex-foundation#162)
Browse files Browse the repository at this point in the history
* Remove asset_creator_unique as we now allow duplicate rows

* fix creators re-ordering bug

* pr comments

* fix crgo file

---------

Co-authored-by: Michael Danenberg <[email protected]>
  • Loading branch information
NicolasPennie and danenbm authored Jan 24, 2024
1 parent 62bb23c commit 5d0177a
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 70 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ async-trait = "0.1.60"
base64 = "0.21.0"
blockbuster = "=0.9.0-beta.5"
borsh = "~0.10.3"
borsh-derive = "~0.10.3"
bs58 = "0.4.0"
cadence = "0.29.0"
cadence-macros = "0.29.0"
Expand Down
4 changes: 2 additions & 2 deletions digital_asset_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ publish = { workspace = true }
[dependencies]
async-trait = { workspace = true }
blockbuster = { workspace = true }
borsh = { version = "0.9.3", optional = true }
borsh-derive = { version = "0.9.3", optional = true }
borsh = { workspace = true, optional = true }
borsh-derive = { workspace = true, optional = true }
bs58 = { workspace = true }
futures = { workspace = true }
indexmap = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mod m20231206_120101_remove_was_decompressed;
mod m20240104_203133_add_cl_audits_v2;
mod m20240104_203328_remove_cl_audits;
mod m20240116_130744_add_update_metadata_ix;
mod m20240117_120101_alter_creator_indices;

pub mod model;

Expand Down Expand Up @@ -79,6 +80,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240104_203133_add_cl_audits_v2::Migration),
Box::new(m20240104_203328_remove_cl_audits::Migration),
Box::new(m20240116_130744_add_update_metadata_ix::Migration),
Box::new(m20240117_120101_alter_creator_indices::Migration),
]
}
}
81 changes: 81 additions & 0 deletions migration/src/m20240117_120101_alter_creator_indices.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use sea_orm_migration::prelude::*;

use crate::model::table::AssetCreators;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// New index to serve as a replacement for "asset_creator_unique" when someone queries GetAssetsByCreator.
// We add it before deleting the "asset_creator_unique" index to ensure no customer impact.
manager
.create_index(
Index::create()
.name("asset_creator_verified")
.table(AssetCreators::Table)
.col(AssetCreators::AssetId)
.col(AssetCreators::Creator)
.col(AssetCreators::Verified)
.to_owned(),
)
.await?;

// We no longer want to enforce uniques on the (asset_id, creator) pairs.
// We may end up with duplicate (asset_id, creator) pairs during indexing, because a creator can change position.
// Any stale rows (older seq/slot_updated) will be ignored, meaning the API users will never see duplicate creators.
manager
.drop_index(
sea_query::Index::drop()
.name("asset_creator_unique")
.table(AssetCreators::Table)
.to_owned(),
)
.await?;

// This index is unused and can be removed.
manager
.drop_index(
sea_query::Index::drop()
.name("asset_verified_creator")
.table(AssetCreators::Table)
.to_owned(),
)
.await?;
Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_index(
Index::create()
.unique()
.name("asset_creator_unique")
.table(AssetCreators::Table)
.col(AssetCreators::AssetId)
.col(AssetCreators::Creator)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("asset_verified_creator")
.table(AssetCreators::Table)
.col(AssetCreators::AssetId)
.col(AssetCreators::Verified)
.to_owned(),
)
.await?;
manager
.drop_index(
sea_query::Index::drop()
.name("asset_creator_verified")
.table(AssetCreators::Table)
.to_owned(),
)
.await?;
Ok(())
}
}
36 changes: 13 additions & 23 deletions nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,13 +532,10 @@ pub async fn upsert_asset_creators<T>(
where
T: ConnectionTrait + TransactionTrait,
{
// Vec to hold base creator information.
let mut db_creators = Vec::with_capacity(creators.len());

if creators.is_empty() {
// Bubblegum supports empty creator array. In this case insert an empty Vec
// for the creator.
db_creators.push(asset_creators::ActiveModel {
let db_creators = if creators.is_empty() {
// If creators are empty, insert an empty creator with the current sequence.
// This prevents accidental errors during out-of-order updates.
vec![asset_creators::ActiveModel {
asset_id: Set(id.clone()),
position: Set(0),
creator: Set(vec![]),
Expand All @@ -547,17 +544,12 @@ where
slot_updated: Set(Some(slot_updated)),
seq: Set(Some(seq)),
..Default::default()
});
}]
} else {
// Set to prevent duplicates.
let mut creators_set = HashSet::new();

for (i, c) in creators.iter().enumerate() {
if creators_set.contains(&c.address) {
continue;
}

db_creators.push(asset_creators::ActiveModel {
creators
.iter()
.enumerate()
.map(|(i, c)| asset_creators::ActiveModel {
asset_id: Set(id.clone()),
position: Set(i as i16),
creator: Set(c.address.to_bytes().to_vec()),
Expand All @@ -566,11 +558,9 @@ where
slot_updated: Set(Some(slot_updated)),
seq: Set(Some(seq)),
..Default::default()
});

creators_set.insert(c.address);
}
}
})
.collect()
};

// This statement will update base information for each creator.
let mut query = asset_creators::Entity::insert_many(db_creators)
Expand All @@ -583,8 +573,8 @@ where
asset_creators::Column::Creator,
asset_creators::Column::Share,
asset_creators::Column::Verified,
asset_creators::Column::SlotUpdated,
asset_creators::Column::Seq,
asset_creators::Column::SlotUpdated,
])
.to_owned(),
)
Expand Down
81 changes: 38 additions & 43 deletions nft_ingester/src/program_transformers/token_metadata/v1_asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,52 +380,47 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
.map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
}

let creators = data.creators.unwrap_or_default();
if !creators.is_empty() {
let mut creators_set = HashSet::new();
let mut db_creators = Vec::with_capacity(creators.len());
for (i, c) in creators.into_iter().enumerate() {
if creators_set.contains(&c.address) {
continue;
}
db_creators.push(asset_creators::ActiveModel {
asset_id: Set(mint_pubkey_vec.clone()),
position: Set(i as i16),
creator: Set(c.address.to_bytes().to_vec()),
share: Set(c.share as i32),
verified: Set(c.verified),
slot_updated: Set(Some(slot_i)),
seq: Set(Some(0)),
..Default::default()
});
creators_set.insert(c.address);
}
let creators = data
.creators
.unwrap_or_default()
.iter()
.enumerate()
.map(|(i, creator)| asset_creators::ActiveModel {
asset_id: Set(mint_pubkey_vec.clone()),
position: Set(i as i16),
creator: Set(creator.address.to_bytes().to_vec()),
share: Set(creator.share as i32),
verified: Set(creator.verified),
slot_updated: Set(Some(slot_i)),
seq: Set(Some(0)),
..Default::default()
})
.collect::<Vec<_>>();

if !db_creators.is_empty() {
let mut query = asset_creators::Entity::insert_many(db_creators)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Position,
])
.update_columns([
asset_creators::Column::Creator,
asset_creators::Column::Share,
asset_creators::Column::Verified,
asset_creators::Column::Seq,
asset_creators::Column::SlotUpdated,
])
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!(
"{} WHERE excluded.slot_updated > asset_creators.slot_updated",
if !creators.is_empty() {
let mut query = asset_creators::Entity::insert_many(creators)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Position,
])
.update_columns([
asset_creators::Column::Creator,
asset_creators::Column::Share,
asset_creators::Column::Verified,
asset_creators::Column::Seq,
asset_creators::Column::SlotUpdated,
])
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!(
"{} WHERE excluded.slot_updated >= asset_creators.slot_updated OR asset_creators.slot_updated is NULL",
query.sql
);
txn.execute(query)
.await
.map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
}
txn.execute(query)
.await
.map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
}
txn.commit().await?;

Expand Down

0 comments on commit 5d0177a

Please sign in to comment.