diff --git a/Cargo.lock b/Cargo.lock index c8940e494..5cf0c6eee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -985,8 +985,6 @@ checksum = "8d696c370c750c948ada61c69a0ee2cbbb9c50b1019ddb86d9317157a99c2cae" [[package]] name = "blockbuster" version = "0.9.0-beta.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56e0240c1218958c0d51284d783fa055f551d769bb8b7a4abf635b17fa9620dc" dependencies = [ "anchor-lang", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 5aff0b7fe..1f93607d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,6 +93,9 @@ tracing-subscriber = "0.3.16" txn_forwarder = { path = "tools/txn_forwarder" } url = "2.3.1" +[patch.crates-io] +blockbuster = { path = "/home/kirill/projects/blockbuster/blockbuster" } + [workspace.lints.clippy] clone_on_ref_ptr = "deny" missing_const_for_fn = "deny" diff --git a/nft_ingester/src/account_updates.rs b/nft_ingester/src/account_updates.rs index 354aaf9c6..02cf4bdb6 100644 --- a/nft_ingester/src/account_updates.rs +++ b/nft_ingester/src/account_updates.rs @@ -2,6 +2,7 @@ use { crate::{ metric, metrics::capture_result, + plerkle::{parse_pubkey, parse_vector}, tasks::{create_download_metadata_notifier, TaskData}, }, cadence_macros::{is_global_default_set, statsd_count, statsd_time}, @@ -9,7 +10,7 @@ use { log::{debug, error}, plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData}, plerkle_serialization::root_as_account_info, - program_transformers::ProgramTransformer, + program_transformers::{error::ProgramTransformerResult, AccountInfo, ProgramTransformer}, sqlx::{Pool, Postgres}, std::sync::Arc, tokio::{ @@ -103,7 +104,7 @@ async fn handle_account( account = Some(bs58::encode(pubkey.0.as_slice()).into_string()); } let begin_processing = Instant::now(); - let res = manager.handle_account_update(account_update).await; + let res = handle_account_update(manager, account_update).await; let should_ack = capture_result( id.clone(), stream_key, @@ -120,3 +121,17 @@ async fn handle_account( } ret_id } + +async fn handle_account_update<'a>( + manager: Arc, + account_update: plerkle_serialization::AccountInfo<'_>, +) -> ProgramTransformerResult<()> { + manager + .handle_account_update(&AccountInfo { + slot: account_update.slot(), + pubkey: &parse_pubkey(account_update.pubkey())?, + owner: &parse_pubkey(account_update.owner())?, + data: parse_vector(account_update.data())?, + }) + .await +} diff --git a/nft_ingester/src/lib.rs b/nft_ingester/src/lib.rs index 9aee9c6ba..84f5852f5 100644 --- a/nft_ingester/src/lib.rs +++ b/nft_ingester/src/lib.rs @@ -5,6 +5,7 @@ pub mod config; pub mod database; pub mod error; pub mod metrics; +pub mod plerkle; pub mod stream; pub mod tasks; pub mod transaction_notifications; diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index 1c13320b5..b533458c5 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -5,6 +5,7 @@ pub mod config; mod database; pub mod error; pub mod metrics; +mod plerkle; mod stream; pub mod tasks; mod transaction_notifications; diff --git a/nft_ingester/src/plerkle.rs b/nft_ingester/src/plerkle.rs new file mode 100644 index 000000000..530a8bc98 --- /dev/null +++ b/nft_ingester/src/plerkle.rs @@ -0,0 +1,26 @@ +use { + flatbuffers::Vector, + plerkle_serialization::Pubkey as FBPubkey, + program_transformers::error::{ProgramTransformerError, ProgramTransformerResult}, + solana_sdk::pubkey::Pubkey, +}; + +pub fn parse_pubkey(pubkey: Option<&FBPubkey>) -> ProgramTransformerResult { + Ok(Pubkey::try_from( + pubkey + .ok_or_else(|| { + ProgramTransformerError::DeserializationError( + "Could not deserialize data".to_owned(), + ) + })? + .0 + .as_slice(), + ) + .expect("valid key from FlatBuffer")) +} + +pub fn parse_vector(data: Option>) -> ProgramTransformerResult<&[u8]> { + data.map(|data| data.bytes()).ok_or_else(|| { + ProgramTransformerError::DeserializationError("Could not deserialize data".to_owned()) + }) +} diff --git a/program_transformers/src/lib.rs b/program_transformers/src/lib.rs index 03f582060..bf4ac0623 100644 --- a/program_transformers/src/lib.rs +++ b/program_transformers/src/lib.rs @@ -14,7 +14,7 @@ use { }, }, futures::future::BoxFuture, - plerkle_serialization::{AccountInfo, Pubkey as FBPubkey, TransactionInfo}, + plerkle_serialization::{Pubkey as FBPubkey, TransactionInfo}, sea_orm::{DatabaseConnection, SqlxPostgresConnector}, solana_sdk::pubkey::Pubkey, sqlx::PgPool, @@ -27,6 +27,14 @@ pub mod error; mod token; mod token_metadata; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct AccountInfo<'a> { + pub slot: u64, + pub pubkey: &'a Pubkey, + pub owner: &'a Pubkey, + pub data: &'a [u8], +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct DownloadMetadataInfo { asset_data_id: Vec, @@ -97,9 +105,8 @@ impl ProgramTransformer { } #[allow(clippy::borrowed_box)] - pub fn match_program(&self, key: &FBPubkey) -> Option<&Box> { - self.parsers - .get(&Pubkey::try_from(key.0.as_slice()).expect("valid key from FlatBuffer")) + pub fn match_program(&self, key: &Pubkey) -> Option<&Box> { + self.parsers.get(key) } pub async fn handle_transaction<'a>( @@ -151,7 +158,9 @@ impl ProgramTransformer { slot, }; - if let Some(program) = self.match_program(&ix.program) { + let program_key = + Pubkey::try_from(ix.program.0.as_slice()).expect("valid key from FlatBuffer"); + if let Some(program) = self.match_program(&program_key) { debug!("Found a ix for program: {:?}", program.key()); let result = program.handle_instruction(&ix)?; let concrete = result.result_type(); @@ -187,18 +196,16 @@ impl ProgramTransformer { Ok(()) } - pub async fn handle_account_update<'b>( + pub async fn handle_account_update( &self, - acct: AccountInfo<'b>, + account_info: &AccountInfo<'_>, ) -> ProgramTransformerResult<()> { - let owner = acct.owner().unwrap(); - if let Some(program) = self.match_program(owner) { - let result = program.handle_account(&acct)?; - let concrete = result.result_type(); - match concrete { + if let Some(program) = self.match_program(account_info.owner) { + let result = program.handle_account(account_info.data)?; + match result.result_type() { ProgramParseResult::TokenMetadata(parsing_result) => { handle_token_metadata_account( - &acct, + account_info, parsing_result, &self.storage, &self.download_metadata_notifier, @@ -207,7 +214,7 @@ impl ProgramTransformer { } ProgramParseResult::TokenProgramAccount(parsing_result) => { handle_token_program_account( - &acct, + account_info, parsing_result, &self.storage, &self.download_metadata_notifier, diff --git a/program_transformers/src/token/mod.rs b/program_transformers/src/token/mod.rs index 690c44cae..5098d743a 100644 --- a/program_transformers/src/token/mod.rs +++ b/program_transformers/src/token/mod.rs @@ -1,11 +1,10 @@ use { crate::{ error::{ProgramTransformerError, ProgramTransformerResult}, - DownloadMetadataNotifier, + AccountInfo, DownloadMetadataNotifier, }, blockbuster::programs::token_account::TokenProgramAccount, digital_asset_types::dao::{asset, token_accounts, tokens}, - plerkle_serialization::AccountInfo, sea_orm::{ entity::{ActiveModelTrait, ActiveValue, ColumnTrait}, query::{QueryFilter, QueryTrait}, @@ -16,15 +15,14 @@ use { spl_token::state::AccountState, }; -pub async fn handle_token_program_account<'a, 'b, 'c>( - account_update: &'a AccountInfo<'a>, - parsing_result: &'b TokenProgramAccount, - db: &'c DatabaseConnection, +pub async fn handle_token_program_account( + account_info: &AccountInfo<'_>, + parsing_result: &TokenProgramAccount, + db: &DatabaseConnection, _download_metadata_notifier: &DownloadMetadataNotifier, ) -> ProgramTransformerResult<()> { - let key = *account_update.pubkey().unwrap(); - let key_bytes = key.0.to_vec(); - let spl_token_program = account_update.owner().unwrap().0.to_vec(); + let key_bytes = account_info.pubkey.to_bytes().to_vec(); + let spl_token_program = account_info.owner.to_bytes().to_vec(); match &parsing_result { TokenProgramAccount::TokenAccount(ta) => { let mint = ta.mint.to_bytes().to_vec(); @@ -42,7 +40,7 @@ pub async fn handle_token_program_account<'a, 'b, 'c>( frozen: ActiveValue::Set(frozen), delegated_amount: ActiveValue::Set(ta.delegated_amount as i64), token_program: ActiveValue::Set(spl_token_program), - slot_updated: ActiveValue::Set(account_update.slot() as i64), + slot_updated: ActiveValue::Set(account_info.slot as i64), amount: ActiveValue::Set(ta.amount as i64), close_authority: ActiveValue::Set(None), }; @@ -99,7 +97,7 @@ pub async fn handle_token_program_account<'a, 'b, 'c>( let model = tokens::ActiveModel { mint: ActiveValue::Set(key_bytes.clone()), token_program: ActiveValue::Set(spl_token_program), - slot_updated: ActiveValue::Set(account_update.slot() as i64), + slot_updated: ActiveValue::Set(account_info.slot as i64), supply: ActiveValue::Set(m.supply as i64), decimals: ActiveValue::Set(m.decimals as i32), close_authority: ActiveValue::Set(None), @@ -142,6 +140,5 @@ pub async fn handle_token_program_account<'a, 'b, 'c>( Ok(()) } _ => Err(ProgramTransformerError::NotImplemented), - }?; - Ok(()) + } } diff --git a/program_transformers/src/token_metadata/master_edition.rs b/program_transformers/src/token_metadata/master_edition.rs index 2cba19853..af41f1eee 100644 --- a/program_transformers/src/token_metadata/master_edition.rs +++ b/program_transformers/src/token_metadata/master_edition.rs @@ -5,17 +5,17 @@ use { asset, asset_v1_account_attachments, sea_orm_active_enums::{SpecificationAssetClass, V1AccountAttachments}, }, - plerkle_serialization::Pubkey as FBPubkey, sea_orm::{ entity::{ActiveModelTrait, ActiveValue, EntityTrait, RelationTrait}, query::{JoinType, QuerySelect, QueryTrait}, sea_query::query::OnConflict, ConnectionTrait, DatabaseTransaction, DbBackend, }, + solana_sdk::pubkey::Pubkey, }; pub async fn save_v2_master_edition( - id: FBPubkey, + id: &Pubkey, slot: u64, me_data: &MasterEditionV2, txn: &DatabaseTransaction, @@ -31,7 +31,7 @@ pub async fn save_v2_master_edition( } pub async fn save_v1_master_edition( - id: FBPubkey, + id: &Pubkey, slot: u64, me_data: &MasterEditionV1, txn: &DatabaseTransaction, @@ -52,14 +52,14 @@ pub async fn save_v1_master_edition( } pub async fn save_master_edition( _version: V1AccountAttachments, - id: FBPubkey, + id: &Pubkey, slot: u64, me_data: &MasterEditionV2, txn: &DatabaseTransaction, ) -> ProgramTransformerResult<()> { - let id_bytes = id.0.to_vec(); + let id = id.to_bytes().to_vec(); let master_edition: Option<(asset_v1_account_attachments::Model, Option)> = - asset_v1_account_attachments::Entity::find_by_id(id.0.to_vec()) + asset_v1_account_attachments::Entity::find_by_id(id.clone()) .find_also_related(asset::Entity) .join(JoinType::InnerJoin, asset::Relation::AssetData.def()) .one(txn) @@ -68,7 +68,7 @@ pub async fn save_master_edition( .map_err(|e| ProgramTransformerError::SerializatonError(e.to_string()))?; let model = asset_v1_account_attachments::ActiveModel { - id: ActiveValue::Set(id_bytes), + id: ActiveValue::Set(id), attachment_type: ActiveValue::Set(V1AccountAttachments::MasterEditionV1), data: ActiveValue::Set(Some(ser)), slot_updated: ActiveValue::Set(slot as i64), diff --git a/program_transformers/src/token_metadata/mod.rs b/program_transformers/src/token_metadata/mod.rs index 708b7f867..4be03f1b2 100644 --- a/program_transformers/src/token_metadata/mod.rs +++ b/program_transformers/src/token_metadata/mod.rs @@ -5,38 +5,34 @@ use { master_edition::{save_v1_master_edition, save_v2_master_edition}, v1_asset::{burn_v1_asset, save_v1_asset}, }, - DownloadMetadataNotifier, + AccountInfo, DownloadMetadataNotifier, }, blockbuster::programs::token_metadata::{TokenMetadataAccountData, TokenMetadataAccountState}, - plerkle_serialization::AccountInfo, sea_orm::{DatabaseConnection, TransactionTrait}, }; mod master_edition; mod v1_asset; -pub async fn handle_token_metadata_account<'a, 'b, 'c>( - account_update: &'a AccountInfo<'a>, - parsing_result: &'b TokenMetadataAccountState, - db: &'c DatabaseConnection, +pub async fn handle_token_metadata_account( + account_info: &AccountInfo<'_>, + parsing_result: &TokenMetadataAccountState, + db: &DatabaseConnection, download_metadata_notifier: &DownloadMetadataNotifier, ) -> ProgramTransformerResult<()> { - let key = *account_update.pubkey().unwrap(); match &parsing_result.data { TokenMetadataAccountData::EmptyAccount => { - burn_v1_asset(db, key, account_update.slot()).await?; + burn_v1_asset(db, account_info.pubkey, account_info.slot).await?; Ok(()) } TokenMetadataAccountData::MasterEditionV1(m) => { let txn = db.begin().await?; - save_v1_master_edition(key, account_update.slot(), m, &txn).await?; + save_v1_master_edition(account_info.pubkey, account_info.slot, m, &txn).await?; txn.commit().await?; Ok(()) } TokenMetadataAccountData::MetadataV1(m) => { - if let Some(info) = - save_v1_asset(db, m.mint.as_ref().into(), account_update.slot(), m).await? - { + if let Some(info) = save_v1_asset(db, &m.mint, account_info.slot, m).await? { download_metadata_notifier(info) .await .map_err(ProgramTransformerError::DownloadMetadataNotify)?; @@ -45,7 +41,7 @@ pub async fn handle_token_metadata_account<'a, 'b, 'c>( } TokenMetadataAccountData::MasterEditionV2(m) => { let txn = db.begin().await?; - save_v2_master_edition(key, account_update.slot(), m, &txn).await?; + save_v2_master_edition(account_info.pubkey, account_info.slot, m, &txn).await?; txn.commit().await?; Ok(()) } @@ -53,6 +49,5 @@ pub async fn handle_token_metadata_account<'a, 'b, 'c>( // TokenMetadataAccountData::UseAuthorityRecord(_) => {} // TokenMetadataAccountData::CollectionAuthorityRecord(_) => {} _ => Err(ProgramTransformerError::NotImplemented), - }?; - Ok(()) + } } diff --git a/program_transformers/src/token_metadata/v1_asset.rs b/program_transformers/src/token_metadata/v1_asset.rs index 678e52594..414c4f324 100644 --- a/program_transformers/src/token_metadata/v1_asset.rs +++ b/program_transformers/src/token_metadata/v1_asset.rs @@ -20,25 +20,25 @@ use { json::ChainDataV1, }, num_traits::FromPrimitive, - plerkle_serialization::Pubkey as FBPubkey, sea_orm::{ entity::{ActiveValue, ColumnTrait, EntityTrait}, query::{Condition, JsonValue, QueryFilter, QueryTrait}, sea_query::query::OnConflict, ConnectionTrait, DbBackend, DbErr, TransactionTrait, }, + solana_sdk::pubkey::Pubkey, std::collections::HashSet, tracing::warn, }; pub async fn burn_v1_asset( conn: &T, - id: FBPubkey, + id: &Pubkey, slot: u64, ) -> ProgramTransformerResult<()> { - let (id, slot_i) = (id.0, slot as i64); + let (id, slot_i) = (id.to_bytes().to_vec(), slot as i64); let model = asset::ActiveModel { - id: ActiveValue::Set(id.to_vec()), + id: ActiveValue::Set(id), slot_updated: ActiveValue::Set(Some(slot_i)), burnt: ActiveValue::Set(true), ..Default::default() @@ -60,7 +60,7 @@ pub async fn burn_v1_asset( pub async fn save_v1_asset( conn: &T, - id: FBPubkey, + id: &Pubkey, slot: u64, metadata: &Metadata, ) -> ProgramTransformerResult> { @@ -70,7 +70,7 @@ pub async fn save_v1_asset( let (edition_attachment_address, _) = find_master_edition_account(&meta_mint_pubkey); let mint = metadata.mint.to_bytes().to_vec(); let authority = metadata.update_authority.to_bytes().to_vec(); - let id = id.0; + let id = id.to_bytes().to_vec(); let slot_i = slot as i64; let uri = data.uri.trim().replace('\0', ""); let _spec = SpecificationVersions::V1;