Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix creators re-ordering bug #162

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions digital_asset_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ publish = { workspace = true }
[dependencies]
async-trait = { workspace = true }
blockbuster = { workspace = true }
borsh = { version = "0.9.3", optional = true }
borsh-derive = { version = "0.9.3", optional = true }
borsh = { workspace = true, optional = true }
borsh-derive = { workerspace = true, optional = true }
danenbm marked this conversation as resolved.
Show resolved Hide resolved
bs58 = { workspace = true }
futures = { workspace = true }
indexmap = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mod m20231206_120101_remove_was_decompressed;
mod m20240104_203133_add_cl_audits_v2;
mod m20240104_203328_remove_cl_audits;
mod m20240116_130744_add_update_metadata_ix;
mod m20240117_120101_alter_creator_indices;

pub mod model;

Expand Down Expand Up @@ -79,6 +80,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240104_203133_add_cl_audits_v2::Migration),
Box::new(m20240104_203328_remove_cl_audits::Migration),
Box::new(m20240116_130744_add_update_metadata_ix::Migration),
Box::new(m20240117_120101_alter_creator_indices::Migration),
]
}
}
81 changes: 81 additions & 0 deletions migration/src/m20240117_120101_alter_creator_indices.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use sea_orm_migration::prelude::*;

use crate::model::table::AssetCreators;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// New index to serve as a replacement for "asset_creator_unique" when someone queries GetAssetsByCreator.
// We add it before deleting the "asset_creator_unique" index to ensure no customer impact.
manager
.create_index(
Index::create()
.name("asset_creator_verified")
.table(AssetCreators::Table)
.col(AssetCreators::AssetId)
.col(AssetCreators::Creator)
.col(AssetCreators::Verified)
.to_owned(),
)
.await?;

// We no longer want to enforce uniques on the (asset_id, creator) pairs.
// We may end up with duplicate (asset_id, creator) pairs during indexing, because a creator can change position.
// Any stale rows (older seq/slot_updated) will be ignored, meaning the API users will never see duplicate creators.
manager
.drop_index(
sea_query::Index::drop()
.name("asset_creator_unique")
.table(AssetCreators::Table)
.to_owned(),
)
.await?;

// This index is unused and can be removed.
manager
.drop_index(
sea_query::Index::drop()
.name("asset_verified_creator")
.table(AssetCreators::Table)
.to_owned(),
)
.await?;
Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_index(
Index::create()
.unique()
.name("asset_creator_unique")
.table(AssetCreators::Table)
.col(AssetCreators::AssetId)
.col(AssetCreators::Creator)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("asset_verified_creator")
.table(AssetCreators::Table)
.col(AssetCreators::AssetId)
.col(AssetCreators::Verified)
.to_owned(),
)
.await?;
manager
.drop_index(
sea_query::Index::drop()
.name("asset_creator_verified")
.table(AssetCreators::Table)
.to_owned(),
)
.await?;
Ok(())
}
}
86 changes: 31 additions & 55 deletions nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,70 +532,46 @@ pub async fn upsert_asset_creators<T>(
where
T: ConnectionTrait + TransactionTrait,
{
// Vec to hold base creator information.
let mut db_creators = Vec::with_capacity(creators.len());

if creators.is_empty() {
// Bubblegum supports empty creator array. In this case insert an empty Vec
// for the creator.
db_creators.push(asset_creators::ActiveModel {
let db_creators = creators
.iter()
.enumerate()
.map(|(i, creator)| asset_creators::ActiveModel {
asset_id: Set(id.clone()),
position: Set(0),
danenbm marked this conversation as resolved.
Show resolved Hide resolved
creator: Set(vec![]),
share: Set(100),
verified: Set(false),
position: Set(i as i16),
creator: Set(creator.address.to_bytes().to_vec()),
share: Set(creator.share as i32),
verified: Set(creator.verified),
slot_updated: Set(Some(slot_updated)),
seq: Set(Some(seq)),
..Default::default()
});
} else {
// Set to prevent duplicates.
let mut creators_set = HashSet::new();

for (i, c) in creators.iter().enumerate() {
if creators_set.contains(&c.address) {
continue;
}
NicolasPennie marked this conversation as resolved.
Show resolved Hide resolved
})
.collect::<Vec<_>>();

db_creators.push(asset_creators::ActiveModel {
asset_id: Set(id.clone()),
position: Set(i as i16),
creator: Set(c.address.to_bytes().to_vec()),
share: Set(c.share as i32),
verified: Set(c.verified),
slot_updated: Set(Some(slot_updated)),
seq: Set(Some(seq)),
..Default::default()
});

creators_set.insert(c.address);
}
}

// This statement will update base information for each creator.
let mut query = asset_creators::Entity::insert_many(db_creators)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Position,
])
.update_columns([
asset_creators::Column::Creator,
asset_creators::Column::Share,
asset_creators::Column::Verified,
asset_creators::Column::SlotUpdated,
asset_creators::Column::Seq,
])
.to_owned(),
)
.build(DbBackend::Postgres);
if !db_creators.is_empty() {
// This statement will update base information for each creator.
let mut query = asset_creators::Entity::insert_many(db_creators)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Position,
])
.update_columns([
asset_creators::Column::Creator,
asset_creators::Column::Share,
asset_creators::Column::Verified,
asset_creators::Column::Seq,
asset_creators::Column::SlotUpdated,
])
.to_owned(),
)
.build(DbBackend::Postgres);

query.sql = format!(
query.sql = format!(
"{} WHERE (asset_creators.seq != 0 AND excluded.seq >= asset_creators.seq) OR asset_creators.seq IS NULL",
query.sql
);

txn.execute(query).await?;
txn.execute(query).await?;
}

Ok(())
}
Expand Down
81 changes: 38 additions & 43 deletions nft_ingester/src/program_transformers/token_metadata/v1_asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,52 +380,47 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(
.map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
}

let creators = data.creators.unwrap_or_default();
if !creators.is_empty() {
let mut creators_set = HashSet::new();
let mut db_creators = Vec::with_capacity(creators.len());
for (i, c) in creators.into_iter().enumerate() {
if creators_set.contains(&c.address) {
continue;
}
db_creators.push(asset_creators::ActiveModel {
asset_id: Set(mint_pubkey_vec.clone()),
position: Set(i as i16),
creator: Set(c.address.to_bytes().to_vec()),
share: Set(c.share as i32),
verified: Set(c.verified),
slot_updated: Set(Some(slot_i)),
seq: Set(Some(0)),
..Default::default()
});
creators_set.insert(c.address);
}
let creators = data
.creators
.unwrap_or_default()
.iter()
.enumerate()
.map(|(i, creator)| asset_creators::ActiveModel {
asset_id: Set(mint_pubkey_vec.clone()),
position: Set(i as i16),
creator: Set(creator.address.to_bytes().to_vec()),
share: Set(creator.share as i32),
verified: Set(creator.verified),
slot_updated: Set(Some(slot_i)),
seq: Set(Some(0)),
..Default::default()
})
.collect::<Vec<_>>();

if !db_creators.is_empty() {
let mut query = asset_creators::Entity::insert_many(db_creators)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Position,
])
.update_columns([
asset_creators::Column::Creator,
asset_creators::Column::Share,
asset_creators::Column::Verified,
asset_creators::Column::Seq,
asset_creators::Column::SlotUpdated,
])
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!(
"{} WHERE excluded.slot_updated > asset_creators.slot_updated",
if !creators.is_empty() {
let mut query = asset_creators::Entity::insert_many(creators)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Position,
])
.update_columns([
asset_creators::Column::Creator,
asset_creators::Column::Share,
asset_creators::Column::Verified,
asset_creators::Column::Seq,
asset_creators::Column::SlotUpdated,
])
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!(
"{} WHERE excluded.slot_updated >= asset_creators.slot_updated OR asset_creators.slot_updated is NULL",
query.sql
);
txn.execute(query)
.await
.map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
}
txn.execute(query)
.await
.map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
}
txn.commit().await?;

Expand Down