Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use transactions for asset table upserts #142

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions nft_ingester/src/program_transformers/bubblegum/burn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ where
..Default::default()
};

// Start a db transaction.
let multi_txn = txn.begin().await?;

// Upsert asset table `burnt` column.
let query = asset::Entity::insert(asset_model)
.on_conflict(
Expand All @@ -53,9 +56,12 @@ where
.to_owned(),
)
.build(DbBackend::Postgres);
txn.execute(query).await?;
multi_txn.execute(query).await?;

upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?;

upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?;
// Close out transaction and relinqish the lock.
multi_txn.commit().await?;

return Ok(());
}
Expand Down
20 changes: 14 additions & 6 deletions nft_ingester/src/program_transformers/bubblegum/cancel_redeem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ where
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
#[allow(unreachable_patterns)]
return match le.schema {

match le.schema {
LeafSchema::V1 {
id,
owner,
Expand All @@ -47,9 +47,12 @@ where
let tree_id = cl.id.to_bytes();
let nonce = cl.index as i64;

// Start a db transaction.
let multi_txn = txn.begin().await?;

// Partial update of asset table with just leaf.
upsert_asset_with_leaf_info(
txn,
&multi_txn,
id_bytes.to_vec(),
nonce,
tree_id.to_vec(),
Expand All @@ -62,17 +65,22 @@ where

// Partial update of asset table with just leaf owner and delegate.
upsert_asset_with_owner_and_delegate_info(
txn,
&multi_txn,
id_bytes.to_vec(),
owner_bytes,
delegate,
seq as i64,
)
.await?;

upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await
upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?;

// Close out transaction and relinqish the lock.
multi_txn.commit().await?;

return Ok(());
}
};
}
}
Err(IngesterError::ParsingError(
"Ix not parsed correctly".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ where
let tree_id = cl.id.to_bytes();
let nonce = cl.index as i64;

// Start a db transaction.
let multi_txn = txn.begin().await?;

// Partial update of asset table with just leaf.
upsert_asset_with_leaf_info(
txn,
&multi_txn,
id_bytes.to_vec(),
nonce,
tree_id.to_vec(),
Expand All @@ -66,10 +69,10 @@ where
)
.await?;

upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?;
upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?;

upsert_collection_info(
txn,
&multi_txn,
id_bytes.to_vec(),
Some(Collection {
key: *collection,
Expand All @@ -80,6 +83,9 @@ where
)
.await?;

// Close out transaction and relinqish the lock.
multi_txn.commit().await?;

return Ok(());
};
Err(IngesterError::ParsingError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ where
let tree_id = cl.id.to_bytes();
let nonce = cl.index as i64;

// Start a db transaction.
let multi_txn = txn.begin().await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is too late to start the TXN. The TXN should include asset_should_be_updated and asset_should_be_updated should be using SELECT ... FOR UPDATE instead of the SELECT only. This would allow you to lock the rows you are selecting and fail transactions that cause a raise condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, I see how the txn could solve issue with asset_should_be_updated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition fixed in #134


// Partial update of asset table with just leaf info.
upsert_asset_with_leaf_info(
txn,
&multi_txn,
id_bytes.to_vec(),
nonce,
tree_id.to_vec(),
Expand All @@ -82,15 +85,18 @@ where

// Partial update of asset table with just leaf owner and delegate.
upsert_asset_with_owner_and_delegate_info(
txn,
&multi_txn,
id_bytes.to_vec(),
owner_bytes,
delegate,
seq as i64,
)
.await?;

upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?;
upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?;

// Close out transaction and relinqish the lock.
multi_txn.commit().await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to know how to handle errors here. what if the transaction fails? Need to have recovery scenarios in that acse surely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the way it works with SeaORM, if the the transaction (multi_txn in this case) goes out of scope, it is rolled back. Therefore, the way the code is structured, if there is an error on one of the executions of the multiple statement transaction, there will be an early return, multi_txn will go out of scope and the transaction will be rolled back.

The reported error will be the one that caused the early return, which is how it functions today. So I think the only difference will be that the asset table changes are tied together, and in the case of failure, all of them will be rolled back together. I don't know if this could be considered better behavior than today or not; it seems about equivalent to me, because either way some of the asset updates for an instruction were missed, and the asset state overall would be missing some updates.


id_bytes.to_vec()
}
Expand Down
14 changes: 11 additions & 3 deletions nft_ingester/src/program_transformers/bubblegum/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,25 @@ where
return Ok(());
}

// Start a db transaction.
let multi_txn = txn.begin().await?;

// Partial update of asset table with just leaf.
upsert_asset_with_leaf_info_for_decompression(txn, id_bytes.to_vec()).await?;
upsert_asset_with_leaf_info_for_decompression(&multi_txn, id_bytes.to_vec()).await?;

upsert_asset_with_compression_info(
txn,
&multi_txn,
id_bytes.to_vec(),
false,
false,
1,
Some(id_bytes.to_vec()),
true,
)
.await
.await?;

// Close out transaction and relinqish the lock.
multi_txn.commit().await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to ensure that we know fully what happens if the TX fails and the reason for the TX failing. Is this a recoverable TX error or not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does my explanation above for how SeaORM transactions are rolled back when the object goes out of scope answer the question? Is there some type of recovery we should aim to do, which is different then the current code that afaik just ignores errors and continues on?


Ok(())
}
16 changes: 12 additions & 4 deletions nft_ingester/src/program_transformers/bubblegum/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ where
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
return match le.schema {
match le.schema {
LeafSchema::V1 {
id,
owner,
Expand All @@ -45,9 +45,12 @@ where
};
let tree_id = cl.id.to_bytes();

// Start a db transaction.
let multi_txn = txn.begin().await?;
danenbm marked this conversation as resolved.
Show resolved Hide resolved

// Partial update of asset table with just leaf.
upsert_asset_with_leaf_info(
txn,
&multi_txn,
id_bytes.to_vec(),
cl.index as i64,
tree_id.to_vec(),
Expand All @@ -60,15 +63,20 @@ where

// Partial update of asset table with just leaf owner and delegate.
upsert_asset_with_owner_and_delegate_info(
txn,
&multi_txn,
id_bytes.to_vec(),
owner_bytes,
delegate,
seq as i64,
)
.await?;

upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await
upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?;

// Close out transaction and relinqish the lock.
multi_txn.commit().await?;
danenbm marked this conversation as resolved.
Show resolved Hide resolved

return Ok(());
}
};
}
Expand Down
20 changes: 13 additions & 7 deletions nft_ingester/src/program_transformers/bubblegum/mint_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,13 @@ where
.await?;

// Upsert into `asset` table.
// Start a db transaction.
let multi_txn = txn.begin().await?;
danenbm marked this conversation as resolved.
Show resolved Hide resolved

// Set base mint info.
let tree_id = bundle.keys.get(3).unwrap().0.to_vec();
unprotected_upsert_asset_base_info(
txn,
&multi_txn,
id_bytes.to_vec(),
OwnerType::Single,
false,
Expand All @@ -124,7 +126,7 @@ where

// Partial update of asset table with just compression info elements.
upsert_asset_with_compression_info(
txn,
&multi_txn,
id_bytes.to_vec(),
true,
false,
Expand All @@ -136,7 +138,7 @@ where

// Partial update of asset table with just leaf.
upsert_asset_with_leaf_info(
txn,
&multi_txn,
id_bytes.to_vec(),
nonce as i64,
tree_id,
Expand All @@ -154,15 +156,21 @@ where
Some(delegate.to_bytes().to_vec())
};
upsert_asset_with_owner_and_delegate_info(
txn,
&multi_txn,
id_bytes.to_vec(),
owner.to_bytes().to_vec(),
delegate,
seq as i64,
)
.await?;

upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?;
upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?;

upsert_asset_with_update_metadata_seq(&multi_txn, id_bytes.to_vec(), seq as i64)
.await?;

// Close out transaction and relinqish the lock.
multi_txn.commit().await?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not include the other updates below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to lock all the different tables at first. Starting with asset seemed like a good incremental improvement since it had the most upserts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorporated all these changes into #134


// Upsert into `asset_v1_account_attachments` table.
let (edition_attachment_address, _) = find_master_edition_account(&id);
Expand Down Expand Up @@ -205,8 +213,6 @@ where
)
.await?;

upsert_asset_with_update_metadata_seq(txn, id_bytes.to_vec(), seq as i64).await?;

if uri.is_empty() {
warn!(
"URI is empty for mint {}. Skipping background task.",
Expand Down
10 changes: 8 additions & 2 deletions nft_ingester/src/program_transformers/bubblegum/redeem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ where
let tree_id = cl.id.to_bytes();
let nonce = cl.index as i64;

// Start a db transaction.
let multi_txn = txn.begin().await?;

// Partial update of asset table with just leaf.
upsert_asset_with_leaf_info(
txn,
&multi_txn,
id_bytes.to_vec(),
nonce,
tree_id.to_vec(),
Expand All @@ -56,7 +59,10 @@ where
)
.await?;

upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await?;
upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?;

// Close out transaction and relinqish the lock.
multi_txn.commit().await?;

return Ok(());
}
Expand Down
18 changes: 13 additions & 5 deletions nft_ingester/src/program_transformers/bubblegum/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ where
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
#[allow(unreachable_patterns)]
return match le.schema {

match le.schema {
LeafSchema::V1 {
id,
owner,
Expand All @@ -48,9 +48,12 @@ where
let tree_id = cl.id.to_bytes();
let nonce = cl.index as i64;

// Start a db transaction.
let multi_txn = txn.begin().await?;

// Partial update of asset table with just leaf.
upsert_asset_with_leaf_info(
txn,
&multi_txn,
id_bytes.to_vec(),
nonce,
tree_id.to_vec(),
Expand All @@ -63,15 +66,20 @@ where

// Partial update of asset table with just leaf owner and delegate.
upsert_asset_with_owner_and_delegate_info(
txn,
&multi_txn,
id_bytes.to_vec(),
owner_bytes,
delegate,
seq as i64,
)
.await?;

upsert_asset_with_seq(txn, id_bytes.to_vec(), seq as i64).await
upsert_asset_with_seq(&multi_txn, id_bytes.to_vec(), seq as i64).await?;

// Close out transaction and relinqish the lock.
multi_txn.commit().await?;

return Ok(());
}
};
}
Expand Down
Loading
Loading