-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: collection hash indexing #104
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,75 +1,111 @@ | ||
use anchor_lang::prelude::Pubkey; | ||
use blockbuster::{ | ||
instruction::InstructionBundle, | ||
programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, | ||
}; | ||
use digital_asset_types::dao::{asset, asset_grouping}; | ||
use digital_asset_types::dao::{asset, asset_grouping, cl_items}; | ||
use log::{debug, info, warn}; | ||
use mpl_bubblegum::{hash_metadata, state::metaplex_adapter::Collection}; | ||
use sea_orm::{entity::*, query::*, sea_query::OnConflict, DbBackend, Set, Unchanged}; | ||
use solana_sdk::keccak; | ||
|
||
use super::{save_changelog_event, update_compressed_asset}; | ||
use crate::error::IngesterError; | ||
pub async fn process<'c, T>( | ||
parsing_result: &BubblegumInstruction, | ||
bundle: &InstructionBundle<'c>, | ||
txn: &'c T, | ||
verify: bool, | ||
instruction: &str, | ||
) -> Result<(), IngesterError> | ||
where | ||
T: ConnectionTrait + TransactionTrait, | ||
{ | ||
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { | ||
// Do we need to update the `slot_updated` field as well as part of the table | ||
// updates below? | ||
if let (Some(le), Some(cl), Some(payload)) = ( | ||
&parsing_result.leaf_update, | ||
&parsing_result.tree_update, | ||
&parsing_result.payload, | ||
) { | ||
let (collection, verify, metadata) = match payload { | ||
Payload::CollectionVerification { | ||
collection, | ||
verify, | ||
args, | ||
} => (collection.clone(), verify.clone(), args.clone()), | ||
_ => { | ||
return Err(IngesterError::ParsingError( | ||
"Ix not parsed correctly".to_string(), | ||
)); | ||
} | ||
}; | ||
debug!( | ||
"Handling collection verification event for {} (verify: {}): {}", | ||
collection, verify, bundle.txn_id | ||
); | ||
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; | ||
match le.schema { | ||
LeafSchema::V1 { id, .. } => { | ||
let id_bytes = id.to_bytes().to_vec(); | ||
let id_bytes = match le.schema { | ||
LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(), | ||
}; | ||
|
||
let mut updated_metadata = metadata.clone(); | ||
updated_metadata.collection = Some(Collection { | ||
key: collection.clone(), | ||
verified: verify, | ||
}); | ||
|
||
let asset_to_update = asset::ActiveModel { | ||
id: Unchanged(id_bytes.clone()), | ||
leaf: Set(Some(le.leaf_hash.to_vec())), | ||
seq: Set(seq as i64), | ||
..Default::default() | ||
}; | ||
update_compressed_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?; | ||
let updated_data_hash = hash_metadata(&updated_metadata) | ||
.map(|e| bs58::encode(e).into_string()) | ||
.unwrap_or("".to_string()) | ||
.trim() | ||
.to_string(); | ||
|
||
if verify { | ||
if let Some(Payload::SetAndVerifyCollection { collection }) = | ||
parsing_result.payload | ||
{ | ||
let grouping = asset_grouping::ActiveModel { | ||
asset_id: Set(id_bytes.clone()), | ||
group_key: Set("collection".to_string()), | ||
group_value: Set(Some(collection.to_string())), | ||
seq: Set(seq as i64), | ||
slot_updated: Set(bundle.slot as i64), | ||
..Default::default() | ||
}; | ||
let mut query = asset_grouping::Entity::insert(grouping) | ||
.on_conflict( | ||
OnConflict::columns([ | ||
asset_grouping::Column::AssetId, | ||
asset_grouping::Column::GroupKey, | ||
]) | ||
.update_columns([ | ||
asset_grouping::Column::GroupKey, | ||
asset_grouping::Column::GroupValue, | ||
asset_grouping::Column::Seq, | ||
asset_grouping::Column::SlotUpdated, | ||
]) | ||
.to_owned(), | ||
) | ||
.build(DbBackend::Postgres); | ||
query.sql = format!( | ||
let asset_to_update = asset::ActiveModel { | ||
id: Unchanged(id_bytes.clone()), | ||
leaf: Set(Some(le.leaf_hash.to_vec())), | ||
seq: Set(seq as i64), | ||
data_hash: Set(Some(updated_data_hash.clone())), // todo remove clone | ||
..Default::default() | ||
}; | ||
update_compressed_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?; | ||
|
||
if verify { | ||
let grouping = asset_grouping::ActiveModel { | ||
asset_id: Set(id_bytes.clone()), | ||
group_key: Set("collection".to_string()), | ||
group_value: Set(Some(collection.to_string())), | ||
seq: Set(seq as i64), | ||
slot_updated: Set(bundle.slot as i64), | ||
..Default::default() | ||
}; | ||
let mut query = asset_grouping::Entity::insert(grouping) | ||
.on_conflict( | ||
OnConflict::columns([ | ||
asset_grouping::Column::AssetId, | ||
asset_grouping::Column::GroupKey, | ||
]) | ||
.update_columns([ | ||
asset_grouping::Column::GroupKey, | ||
asset_grouping::Column::GroupValue, | ||
asset_grouping::Column::Seq, | ||
asset_grouping::Column::SlotUpdated, | ||
]) | ||
.to_owned(), | ||
) | ||
.build(DbBackend::Postgres); | ||
query.sql = format!( | ||
"{} WHERE excluded.slot_updated > asset_grouping.slot_updated AND excluded.seq >= asset_grouping.seq", | ||
query.sql | ||
); | ||
txn.execute(query).await?; | ||
} | ||
} | ||
id_bytes | ||
} | ||
}; | ||
txn.execute(query).await?; | ||
} else { | ||
// TODO: Support collection unverification. | ||
// We will likely need to nullify the collection field so we can maintain | ||
// the seq value and avoid out-of-order indexing bugs. | ||
Comment on lines
+100
to
+102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I originally was thinking along this route but ended up just adding a Maybe after your change to blockbuster, I would not need a separate sequence number because i could update Here is my PR for reference: metaplex-foundation#90 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only thing to factor in is that creators can be removed, so we need to be able to support that case too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah I agree nullification is better future proofing for when we have metadata updates |
||
warn!( | ||
"Collection unverification not processed for asset {} and collection {}", | ||
bs58::encode(id_bytes).into_string(), | ||
collection | ||
); | ||
} | ||
|
||
return Ok(()); | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah not sure if we want the slot_updated here because there could be multiple tx in the same slot when just dealing with bubblegum transactions?