Skip to content

Commit

Permalink
Add conditions to creator upsert, add another check at DAS API level
Browse files Browse the repository at this point in the history
  • Loading branch information
danenbm committed Dec 18, 2023
1 parent 8803152 commit 1a80b98
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 69 deletions.
17 changes: 13 additions & 4 deletions digital_asset_types/src/dao/scopes/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,19 @@ pub async fn get_by_id(
creators.retain(|creator| creator.slot_updated == max_slot_updated);
}

// Any creators that are not the max seq are stale rows, so remove them.
let max_seq = creators.iter().map(|creator| creator.verified_seq).max();
if let Some(max_seq) = max_seq {
creators.retain(|creator| creator.verified_seq == max_seq);
// Any creators that are not the max seq are stale rows or updated by Token Metadata (seq = 0), so remove them.
let seq = if creators
.iter()
.map(|creator| creator.verified_seq)
.any(|seq| seq == Some(0))
{
Some(Some(0))
} else {
creators.iter().map(|creator| creator.verified_seq).max()
};

if let Some(seq) = seq {
creators.retain(|creator| creator.verified_seq == seq);
}
}

Expand Down
16 changes: 9 additions & 7 deletions nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,12 @@ where
T: ConnectionTrait + TransactionTrait,
{
// Vec to hold base creator information.
let mut db_creator_infos = Vec::with_capacity(creators.len());
let mut db_creators = Vec::with_capacity(creators.len());

if creators.is_empty() {
db_creator_infos.push(asset_creators::ActiveModel {
// Bubblegum supports empty creator array. In this case insert an empty Vec
// for the creator.
db_creators.push(asset_creators::ActiveModel {
asset_id: Set(id.clone()),
creator: Set(vec![]),
position: Set(0),
Expand All @@ -582,14 +584,14 @@ where
continue;
}

db_creator_infos.push(asset_creators::ActiveModel {
db_creators.push(asset_creators::ActiveModel {
asset_id: Set(id.clone()),
creator: Set(c.address.to_bytes().to_vec()),
position: Set(i as i16),
creator: Set(c.address.to_bytes().to_vec()),
share: Set(c.share as i32),
slot_updated: Set(Some(slot_updated)),
verified: Set(c.verified),
verified_seq: Set(Some(seq)),
slot_updated: Set(Some(slot_updated)),
..Default::default()
});

Expand All @@ -598,7 +600,7 @@ where
}

// This statement will update base information for each creator.
let mut query = asset_creators::Entity::insert_many(db_creator_infos)
let mut query = asset_creators::Entity::insert_many(db_creators)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
Expand All @@ -616,7 +618,7 @@ where
.build(DbBackend::Postgres);

query.sql = format!(
"{} WHERE excluded.verified_seq >= asset_creators.verified_seq OR asset_creators.verified_seq IS NULL",
"{} WHERE (asset_creators.verified_seq != 0 AND excluded.verified_seq >= asset_creators.verified_seq) OR asset_creators.verified_seq IS NULL",
query.sql
);

Expand Down
125 changes: 67 additions & 58 deletions nft_ingester/src/program_transformers/token_metadata/v1_asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,68 +319,77 @@ pub async fn save_v1_asset<T: ConnectionTrait + TransactionTrait>(

if !creators.is_empty() {
let mut creators_set = HashSet::new();
let existing_creators: Vec<asset_creators::Model> = asset_creators::Entity::find()
.filter(
Condition::all()
.add(asset_creators::Column::AssetId.eq(id.to_vec()))
.add(asset_creators::Column::SlotUpdated.lt(slot_i)),
)
.all(conn)
.await?;

if !existing_creators.is_empty() {
let mut db_creators = Vec::with_capacity(creators.len());
for (i, c) in creators.into_iter().enumerate() {
if creators_set.contains(&c.address) {
continue;
}
db_creators.push(asset_creators::ActiveModel {
asset_id: Set(id.to_vec()),
creator: Set(c.address.to_bytes().to_vec()),
share: Set(c.share as i32),
verified: Set(c.verified),
slot_updated: Set(Some(slot_i)),
position: Set(i as i16),
..Default::default()
});
creators_set.insert(c.address);
}
let txn = conn.begin().await?;
asset_creators::Entity::delete_many()
.filter(
Condition::all()
.add(asset_creators::Column::AssetId.eq(id.to_vec()))
.add(asset_creators::Column::SlotUpdated.lt(slot_i)),
)
.exec(&txn)
.await?;
// TODO: We may not need to care about existing creators.
// let existing_creators: Vec<asset_creators::Model> = asset_creators::Entity::find()
// .filter(
// Condition::all()
// .add(asset_creators::Column::AssetId.eq(id.to_vec()))
// .add(asset_creators::Column::SlotUpdated.lt(slot_i)),
// )
// .all(conn)
// .await?;

if !db_creators.is_empty() {
let mut query = asset_creators::Entity::insert_many(db_creators)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Position,
])
.update_columns([
asset_creators::Column::Creator,
asset_creators::Column::Share,
asset_creators::Column::Verified,
asset_creators::Column::SlotUpdated,
])
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!(
"{} WHERE excluded.slot_updated > asset_creators.slot_updated",
query.sql
);
txn.execute(query)
.await
.map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
//if !existing_creators.is_empty() {

let mut db_creators = Vec::with_capacity(creators.len());
for (i, c) in creators.into_iter().enumerate() {
if creators_set.contains(&c.address) {
continue;
}
txn.commit().await?;
db_creators.push(asset_creators::ActiveModel {
asset_id: Set(id.to_vec()),
position: Set(i as i16),
creator: Set(c.address.to_bytes().to_vec()),
share: Set(c.share as i32),
verified: Set(c.verified),
verified_seq: Set(Some(0)),
slot_updated: Set(Some(slot_i)),
..Default::default()
});
creators_set.insert(c.address);
}

let txn = conn.begin().await?;

// TODO: Delete we don't need to delete existing as it won't truly work with concurrent
// processes anyways so we should filter out stale rows at the API level.
// asset_creators::Entity::delete_many()
// .filter(
// Condition::all()
// .add(asset_creators::Column::AssetId.eq(id.to_vec()))
// .add(asset_creators::Column::SlotUpdated.lt(slot_i)),
// )
// .exec(&txn)
// .await?;

if !db_creators.is_empty() {
let mut query = asset_creators::Entity::insert_many(db_creators)
.on_conflict(
OnConflict::columns([
asset_creators::Column::AssetId,
asset_creators::Column::Position,
])
.update_columns([
asset_creators::Column::Creator,
asset_creators::Column::Share,
asset_creators::Column::Verified,
asset_creators::Column::VerifiedSeq,
asset_creators::Column::SlotUpdated,
])
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!(
"{} WHERE excluded.slot_updated > asset_creators.slot_updated",
query.sql
);
txn.execute(query)
.await
.map_err(|db_err| IngesterError::AssetIndexError(db_err.to_string()))?;
}
txn.commit().await?;
//}
}
if uri.is_empty() {
warn!(
Expand Down

0 comments on commit 1a80b98

Please sign in to comment.