diff --git a/nft_ingester/src/program_transformers/bubblegum/burn.rs b/nft_ingester/src/program_transformers/bubblegum/burn.rs index adca1324f..74d8efc74 100644 --- a/nft_ingester/src/program_transformers/bubblegum/burn.rs +++ b/nft_ingester/src/program_transformers/bubblegum/burn.rs @@ -45,6 +45,9 @@ where ..Default::default() }; + // Start a db transaction. + let multi_txn = txn.begin().await?; + // Upsert asset table `burnt` column. let query = asset::Entity::insert(asset_model) .on_conflict( @@ -53,9 +56,12 @@ where .to_owned(), ) .build(DbBackend::Postgres); - txn.execute(query).await?; + multi_txn.execute(query).await?; + + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + // Close out transaction and relinqish the lock. + multi_txn.commit().await?; return Ok(()); } diff --git a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs index 9b9d83525..174b051ff 100644 --- a/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs @@ -22,8 +22,8 @@ where { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - #[allow(unreachable_patterns)] - return match le.schema { + + match le.schema { LeafSchema::V1 { id, owner, @@ -47,9 +47,12 @@ where let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; + // Start a db transaction. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce, tree_id.to_vec(), @@ -62,7 +65,7 @@ where // Partial update of asset table with just leaf owner and delegate. upsert_asset_with_owner_and_delegate_info( - txn, + &multi_txn, id_bytes.to_vec(), owner_bytes, delegate, @@ -70,9 +73,14 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + // Close out transaction and relinqish the lock. + multi_txn.commit().await?; + + return Ok(()); } - }; + } } Err(IngesterError::ParsingError( "Ix not parsed correctly".to_string(), diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs index 0d6dc828d..535e366e8 100644 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs @@ -53,9 +53,12 @@ where let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; + // Start a db transaction. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce, tree_id.to_vec(), @@ -66,10 +69,10 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; upsert_collection_info( - txn, + &multi_txn, id_bytes.to_vec(), Some(Collection { key: *collection, @@ -80,6 +83,9 @@ where ) .await?; + // Close out transaction and relinqish the lock. + multi_txn.commit().await?; + return Ok(()); }; Err(IngesterError::ParsingError( diff --git a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs index 85dece67e..8241007e4 100644 --- a/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/creator_verification.rs @@ -67,9 +67,12 @@ where let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; + // Start a db transaction. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf info. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce, tree_id.to_vec(), @@ -82,7 +85,7 @@ where // Partial update of asset table with just leaf owner and delegate. upsert_asset_with_owner_and_delegate_info( - txn, + &multi_txn, id_bytes.to_vec(), owner_bytes, delegate, @@ -90,7 +93,10 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + // Close out transaction and relinqish the lock. + multi_txn.commit().await?; id_bytes.to_vec() } diff --git a/nft_ingester/src/program_transformers/bubblegum/decompress.rs b/nft_ingester/src/program_transformers/bubblegum/decompress.rs index 4bee5cc0c..ff74ae9f1 100644 --- a/nft_ingester/src/program_transformers/bubblegum/decompress.rs +++ b/nft_ingester/src/program_transformers/bubblegum/decompress.rs @@ -23,11 +23,14 @@ where return Ok(()); } + // Start a db transaction. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf. - upsert_asset_with_leaf_info_for_decompression(txn, id_bytes.to_vec()).await?; + upsert_asset_with_leaf_info_for_decompression(&multi_txn, id_bytes.to_vec()).await?; upsert_asset_with_compression_info( - txn, + &multi_txn, id_bytes.to_vec(), false, false, @@ -35,5 +38,10 @@ where Some(id_bytes.to_vec()), true, ) - .await + .await?; + + // Close out transaction and relinqish the lock. + multi_txn.commit().await?; + + Ok(()) } diff --git a/nft_ingester/src/program_transformers/bubblegum/delegate.rs b/nft_ingester/src/program_transformers/bubblegum/delegate.rs index 4f0b97e72..f5a5d54a9 100644 --- a/nft_ingester/src/program_transformers/bubblegum/delegate.rs +++ b/nft_ingester/src/program_transformers/bubblegum/delegate.rs @@ -22,7 +22,7 @@ where { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - return match le.schema { + match le.schema { LeafSchema::V1 { id, owner, @@ -45,9 +45,12 @@ where }; let tree_id = cl.id.to_bytes(); + // Start a db transaction. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), cl.index as i64, tree_id.to_vec(), @@ -60,7 +63,7 @@ where // Partial update of asset table with just leaf owner and delegate. upsert_asset_with_owner_and_delegate_info( - txn, + &multi_txn, id_bytes.to_vec(), owner_bytes, delegate, @@ -68,7 +71,12 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + // Close out transaction and relinqish the lock. + multi_txn.commit().await?; + + return Ok(()); } }; } diff --git a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs index 6874cb5ef..8a63591b4 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mint_v1.rs @@ -105,11 +105,13 @@ where .await?; // Upsert into `asset` table. + // Start a db transaction. + let multi_txn = txn.begin().await?; // Set base mint info. let tree_id = bundle.keys.get(3).unwrap().0.to_vec(); unprotected_upsert_asset_base_info( - txn, + &multi_txn, id_bytes.to_vec(), OwnerType::Single, false, @@ -124,7 +126,7 @@ where // Partial update of asset table with just compression info elements. upsert_asset_with_compression_info( - txn, + &multi_txn, id_bytes.to_vec(), true, false, @@ -136,7 +138,7 @@ where // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce as i64, tree_id, @@ -154,7 +156,7 @@ where Some(delegate.to_bytes().to_vec()) }; upsert_asset_with_owner_and_delegate_info( - txn, + &multi_txn, id_bytes.to_vec(), owner.to_bytes().to_vec(), delegate, @@ -162,7 +164,13 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + upsert_asset_with_update_metadata_seq(&multi_txn, id_bytes.to_vec(), seq as i64) + .await?; + + // Close out transaction and relinqish the lock. + multi_txn.commit().await?; // Upsert into `asset_v1_account_attachments` table. let (edition_attachment_address, _) = find_master_edition_account(&id); @@ -205,8 +213,6 @@ where ) .await?; - upsert_asset_with_update_metadata_seq(txn, id_bytes.to_vec(), seq as i64).await?; - if uri.is_empty() { warn!( "URI is empty for mint {}. Skipping background task.", diff --git a/nft_ingester/src/program_transformers/bubblegum/redeem.rs b/nft_ingester/src/program_transformers/bubblegum/redeem.rs index ce45620e5..b2af55450 100644 --- a/nft_ingester/src/program_transformers/bubblegum/redeem.rs +++ b/nft_ingester/src/program_transformers/bubblegum/redeem.rs @@ -43,9 +43,12 @@ where let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; + // Start a db transaction. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce, tree_id.to_vec(), @@ -56,7 +59,10 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + // Close out transaction and relinqish the lock. + multi_txn.commit().await?; return Ok(()); } diff --git a/nft_ingester/src/program_transformers/bubblegum/transfer.rs b/nft_ingester/src/program_transformers/bubblegum/transfer.rs index 2d74707f1..e9017ab6d 100644 --- a/nft_ingester/src/program_transformers/bubblegum/transfer.rs +++ b/nft_ingester/src/program_transformers/bubblegum/transfer.rs @@ -23,8 +23,8 @@ where { if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?; - #[allow(unreachable_patterns)] - return match le.schema { + + match le.schema { LeafSchema::V1 { id, owner, @@ -48,9 +48,12 @@ where let tree_id = cl.id.to_bytes(); let nonce = cl.index as i64; + // Start a db transaction. + let multi_txn = txn.begin().await?; + // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce, tree_id.to_vec(), @@ -63,7 +66,7 @@ where // Partial update of asset table with just leaf owner and delegate. upsert_asset_with_owner_and_delegate_info( - txn, + &multi_txn, id_bytes.to_vec(), owner_bytes, delegate, @@ -71,7 +74,12 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + // Close out transaction and relinqish the lock. + multi_txn.commit().await?; + + return Ok(()); } }; } diff --git a/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs index 2dd04c384..9e108a764 100644 --- a/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs +++ b/nft_ingester/src/program_transformers/bubblegum/update_metadata.rs @@ -146,6 +146,8 @@ where .await?; // Upsert into `asset` table. + // Start a db transaction. + let multi_txn = txn.begin().await?; // Set base mint info. let tree_id = bundle.keys.get(5).unwrap().0.to_vec(); @@ -157,7 +159,7 @@ where }; unprotected_upsert_asset_base_info( - txn, + &multi_txn, id_bytes.to_vec(), OwnerType::Single, false, @@ -172,7 +174,7 @@ where // Partial update of asset table with just compression info elements. upsert_asset_with_compression_info( - txn, + &multi_txn, id_bytes.to_vec(), true, false, @@ -184,7 +186,7 @@ where // Partial update of asset table with just leaf. upsert_asset_with_leaf_info( - txn, + &multi_txn, id_bytes.to_vec(), nonce as i64, tree_id, @@ -202,7 +204,7 @@ where Some(delegate.to_bytes().to_vec()) }; upsert_asset_with_owner_and_delegate_info( - txn, + &multi_txn, id_bytes.to_vec(), owner.to_bytes().to_vec(), delegate, @@ -210,7 +212,13 @@ where ) .await?; - upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?; + upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?; + + upsert_asset_with_update_metadata_seq(&multi_txn, id_bytes.to_vec(), seq as i64) + .await?; + + // Close out transaction and relinqish the lock. + multi_txn.commit().await?; // Upsert into `asset_v1_account_attachments` table. let (edition_attachment_address, _) = find_master_edition_account(&id); @@ -263,8 +271,6 @@ where ) .await?; - upsert_asset_with_update_metadata_seq(txn, id_bytes.to_vec(), seq as i64).await?; - if uri.is_empty() { warn!( "URI is empty for mint {}. Skipping background task.",