-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: batch mint indexing #198
base: main
Are you sure you want to change the base?
Conversation
blockbuster/blockbuster/src/programs/token_extensions/extension.rs
Outdated
Show resolved
Hide resolved
|
||
pub async fn persist_rollups(&self) { | ||
loop { | ||
tokio::time::sleep(Duration::from_secs(5)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered doing a a trigger/notification when a row is added to the rollup_to_verify
table? That would probably work better than a hardcoded delay for the times when there are no rollups and the times there are spikes of multiple new entries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! I added listener for inserts into rollup_to_verify
table. Primary i was thinking about share single listener between multiple instances of RollupPersister but as far as most listener methods use mutable self reference (&mut self) i decided to create new listener for each RollupPersister. i hope that together with StartProcessing state it will not cause any race conditions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was also thinking about using single listener and sending notifications inside code using channels, but mpsc is very uncomfortable for such purpose because we need to send notifications for multiple consumers/workers. There also one community crate that implements Golang channels for Rust with multiple consumers but i think that it is bad practice to use additional import in our case, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there could be a race condition within get_rollup_to_verify()
:
-
Both processes start a transaction, and both can use a
SELECT
query to find the same row inrollups_to_verify
, which is a row that is not in theStartProcessing
state. -
Both processes attempt to update the selected row in
rollups_to_verify
, which does implicitly lock the row. The second process waits for the lock on the row to be released. -
The first process commits the transaction and releases the lock. The second process successfully updates the row again.
-
Now both processes move to
persist_rollup()
and can race each other in their respective state machines.
I think to solve this issue, there's a few options.
-
You could figure out the single listener thing and then each process would receive a notification. However, if there's multiple rows getting added at nearly the same time, then there will be multiple triggers at nearly the same time. From what I can tell there's no specific
rollup_to_verify
row associated with a trigger, so then the multiple processes could still find the same row, causing the same concurrency issue. -
You could use a multi-consumer/multi-producer channel. I agree that mpsc is incorrect for this purpose. I agree the community crate is not right either, as the README says it is end-of-life and deprecated. The Tokio tutorial at https://tokio.rs/tokio/tutorial/channels suggests to use
async-channel
crate for multi-consumer/multi-producer, so you could look into that. -
My recommended option is this third one, which is modify
get_rollup_to_verify()
to do aSELECT ... FOR UPDATE
. I believe you can accomplish this by simply addinglock_for_update()
to theSELECT
:
let rollup_to_verify = rollup_to_verify::Entity::find()
.filter(condition)
.order_by_asc(rollup_to_verify::Column::CreatedAtSlot)
+ .lock_for_update() // ************** Lock the row for the current transaction
.one(&multi_txn)
.await
.map_err(|e| ProgramTransformerError::DatabaseError(e.to_string()))?;
Then I think persist_rollup
also needs to be modified to NOT service rollups in the ReceivedTransaction
state. Otherwise I think it will still grab anything that was just ingested by the program transformer.
- I think there could be multiple other database strategies, such as "serializable transactions" (that one probably causes too much performance hit). I think what I suggest in 3 will work. Here's some other Postgres resources if you want to deep-dive this topic:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem! LGTM with the lock (option 3) in latest rev.
let Ok(rollup) = rollup | ||
.map(|r| bincode::deserialize::<Rollup>(r.rollup_binary_bincode.as_slice())) | ||
.transpose() | ||
.map_err(|e| ProgramTransformerError::DeserializationError(e.to_string())) | ||
else { | ||
continue; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of error will it keep trying to deserialize this rollup in subsequent iterations of the loop, because it is stuck in RollupPersistingState::ReceivedTransaction
and never deleted from the table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch, thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the sequence would be: if this deserialization failed, it would default to None
, which it would pass to persist_rollup
, which would re-download and serialize it, and update the row in the rollup
table with new FileHash
and RollupBinaryBincode
, which we would assume would fix the issue. Is this correct understanding?
But now looking at it more, when is this code even expected to run? In the outer persist_rollups
process, it should get a rollup to verify from the rollup_to_verify
table and at the same time attempt to get the downloaded rollup from the rollup
table. But it doesn't seem like the Rollup should yet be poplulated in the rollup table until persist_rollup
function is called which actually does the download.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't fully understand when this code would be run to deserialize a rollup. It seems like it would only be on an error case because it wouldn't be populated until download_rollup
occurs?
blockbuster/blockbuster/src/programs/token_extensions/extension.rs
Outdated
Show resolved
Hide resolved
.get(1) | ||
.ok_or(BlockbusterError::InstructionParsingError)?; | ||
let staker = *keys | ||
.get(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here should be 4 according to latest Bubblegum program version, I mean version with rollup changes.
@@ -0,0 +1,299 @@ | |||
use crate::error::RollupValidationError; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code in this file looks like code in SDK. Can we import it from there?
pub leaf_update: LeafSchema, | ||
pub mint_args: MetadataArgs, | ||
#[serde(with = "serde_with::As::<serde_with::DisplayFromStr>")] | ||
pub authority: Pubkey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here also should be creator_signature. But I gues we can add it as a separate ticket/PR. Task added to the backlog
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, i think we can remove all code from sdk and just use import but in the scope of separate task. This will also add creator_signature
#[derive(Iden)] | ||
enum RollupToVerify { | ||
Table, | ||
Url, | ||
FileHash, | ||
CreatedAtSlot, | ||
Signature, | ||
DownloadAttempts, | ||
RollupPersistingState, | ||
RollupFailStatus, | ||
Staker, | ||
} | ||
|
||
#[derive(Iden, Debug, PartialEq, Sequence)] | ||
enum PersistingRollupState { | ||
ReceivedTransaction, | ||
FailedToPersist, | ||
StartProcessing, | ||
SuccessfullyDownload, | ||
SuccessfullyValidate, | ||
StoredUpdate, | ||
} | ||
|
||
#[derive(Iden, Debug, PartialEq, Sequence)] | ||
enum FailedRollupState { | ||
DownloadFailed, | ||
ChecksumVerifyFailed, | ||
RollupVerifyFailed, | ||
FileSerialization, | ||
} | ||
|
||
#[derive(Iden)] | ||
enum Rollup { | ||
Table, | ||
FileHash, | ||
RollupBinaryBincode, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it matters but should these be under the model directory and then imported?
.on_conflict( | ||
OnConflict::columns([digital_asset_types::dao::rollup_to_verify::Column::FileHash]) | ||
.update_columns([digital_asset_types::dao::rollup_to_verify::Column::Url]) | ||
.update_columns([digital_asset_types::dao::rollup_to_verify::Column::Signature]) | ||
.update_columns([ | ||
digital_asset_types::dao::rollup_to_verify::Column::DownloadAttempts, | ||
]) | ||
.update_columns([ | ||
digital_asset_types::dao::rollup_to_verify::Column::RollupFailStatus, | ||
]) | ||
.update_columns([ | ||
digital_asset_types::dao::rollup_to_verify::Column::RollupPersistingState, | ||
]) | ||
.update_columns([digital_asset_types::dao::rollup_to_verify::Column::CreatedAtSlot]) | ||
.to_owned(), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should staker
now be included in the ON CONFLICT
section?
nft_ingester/src/rollup_updates.rs
Outdated
if let Err(e) = listener.recv().await { | ||
error!("Recv rollup notification: {}", e); | ||
tokio::time::sleep(Duration::from_secs(5)).await; | ||
continue; | ||
} | ||
if let Err(e) = s.send(()).await { | ||
error!("Send rollup notification: {}", e); | ||
tokio::time::sleep(Duration::from_secs(5)).await; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the benefit of waiting 5 seconds if there is a send or receive error? Doesn't it drop the problematic message and move on? Why wait 5 seconds before doing that?
let condition = Condition::all() | ||
.add( | ||
rollup_to_verify::Column::RollupPersistingState | ||
.ne(RollupPersistingState::FailedToPersist), | ||
) | ||
.add( | ||
rollup_to_verify::Column::RollupPersistingState | ||
.ne(RollupPersistingState::StoredUpdate), | ||
) | ||
.add( | ||
rollup_to_verify::Column::RollupPersistingState | ||
.ne(RollupPersistingState::StartProcessing), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the state of the rollup is SuccessfullyDownload
or SuccessfullyValidate
? It could get picked up here by another processor and put back into StartProcessing
?
Instead shouldn't this condition just be .eq(RollupPersistingState::ReceivedTransaction
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the state of the rollup is SuccessfullyDownload or SuccessfullyValidate then it will be picked up here by processor and continue processing from this state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yeah I see. Somehow thought it would get reset.
&RollupPersistingState::ReceivedTransaction => { | ||
// We get ReceivedTransaction state on the start of processing | ||
rollup_to_verify.rollup_persisting_state = | ||
RollupPersistingState::StartProcessing; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case should not be possible right because anything gotten by get_rollup_to_verify()
should be moved out of this state before being sent as a parameter to persist_rollup
?
); | ||
|
||
pub fn make_concurrent_merkle_tree( | ||
max_dapth: u32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: typo
pub fn validate_change_logs( | ||
max_depth: u32, | ||
max_buffer_size: u32, | ||
leafs: &[[u8; 32]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: should be leaves
let Ok(rollup) = rollup | ||
.map(|r| bincode::deserialize::<Rollup>(r.rollup_binary_bincode.as_slice())) | ||
.transpose() | ||
.map_err(|e| ProgramTransformerError::DeserializationError(e.to_string())) | ||
else { | ||
continue; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't fully understand when this code would be run to deserialize a rollup. It seems like it would only be on an error case because it wouldn't be populated until download_rollup
occurs?
|
||
pub async fn persist_rollups(&self) { | ||
loop { | ||
tokio::time::sleep(Duration::from_secs(5)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem! LGTM with the lock (option 3) in latest rev.
let r = rollup_to_verify::Entity::find() | ||
.filter(rollup_to_verify::Column::FileHash.eq(metadata_hash.clone())) | ||
.one(setup.db.as_ref()) | ||
.await | ||
.unwrap() | ||
.unwrap(); | ||
|
||
assert_eq!(r.file_hash, metadata_hash); | ||
assert_eq!(r.url, metadata_url); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wdyt about checking all the expected columns that should have been inserted?
let query = rollup_to_verify::Entity::insert(rollup_to_verify) | ||
.on_conflict( | ||
OnConflict::columns([rollup_to_verify::Column::FileHash]) | ||
.update_columns([rollup_to_verify::Column::Url]) | ||
.update_columns([rollup_to_verify::Column::Signature]) | ||
.update_columns([rollup_to_verify::Column::DownloadAttempts]) | ||
.update_columns([rollup_to_verify::Column::RollupFailStatus]) | ||
.update_columns([rollup_to_verify::Column::RollupPersistingState]) | ||
.update_columns([rollup_to_verify::Column::CreatedAtSlot]) | ||
.to_owned(), | ||
) | ||
.build(DbBackend::Postgres); | ||
setup.db.execute(query).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should staker
column be incorporated into test now?
} | ||
&RollupPersistingState::SuccessfullyValidate => { | ||
if let Some(r) = &rollup { | ||
// TODO: Add retry? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we do a retry for normal bubblegum processing, but did you want to add retry here? Seems like we don't need it since we don't do it for existing bubblegum txn processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless there are significant changes, I believe I've completed my reviews and once the last open comments are resolved the change looks good to me. I really like the thoughtful design and good testing that was added for this feature.
Next steps for this PR would be:
- Resolve merge conflicts with main, release dependencies and update this PR to use released packages.
- Send PR out to the wider RPC community, discuss on the RPC channel we have, get approval and then merge to main.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as overall naming goes, would batch_operation_persistor
(and in general batch_operation_*
) be a better fit than rollup_persistor
? Or are there other options discussed for naming besides rollup?
bubblegum::mint_v1::mint_v1( | ||
&rolled_mint.into(), | ||
&InstructionBundle { | ||
txn_id: &signature, | ||
program: Default::default(), | ||
instruction: None, | ||
inner_ix: None, | ||
keys: &[], | ||
slot, | ||
}, | ||
txn, | ||
"CreateTreeWithRoot", | ||
false, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we've already talked about this indirectly in a previous comment, but wanted to consider it directly.
The transactions sent for finalize_merkle_tree_with_root
will all have same sequence number of 1. The normal backfilling process for a tree searches for gaps in the sequence numbers to detect missing transactions. In this case, the batch mint/rollup is downloaded and verified by the indexing process so later gap filling is irrelevant.
The end result is there will be multiple leaves for the same tree that share the same sequence number. I cannot think of any issues with that. But wanted to leave the comment just to share the thought process, in case you can think of any system issues with the shared sequence numbers or backfilling.
* feat: add creators and collection bath mint verifications * style: fmt * Update blockbuster/blockbuster/src/programs/bubblegum/mod.rs Co-authored-by: Stanislav Cherviakov <[email protected]> * chore: dependency change * chore: renaming --------- Co-authored-by: Stanislav Cherviakov <[email protected]>
* feat: add skip batch mint indexing feature * fix: small fixes * chore: add comment and variable renaming
This PR aims to add indexing for the FinalizeTreeWithRoot instruction in DAS-API.
Notice: As we need to integrate this instruction into other packages (blockbuster, spl-compression, etc.), we are currently using local forks of these packages. In the future, these will be replaced by standard imports.
The FinalizeTreeWithRoot instruction has a unique characteristic that prevents it from being processed in the same way as other Bubblegum instructions. It represents a batch mint action, and the potential size of this batch can reach many millions of assets. Processing this instruction inline with others could block them in the queue until the FinalizeTreeWithRoot processing is complete, which can take a considerable amount of time.
To address this issue, we decided to create a separate queue for processing rollups, similar to our existing task processing system. When we receive a FinalizeTreeWithRoot instruction update, we add a new rollup to the queue, which is then processed in a separate process.
To represent the rollups queue, we created the
rollup_to_verify
table in Postgres, while downloaded rollups are stored in therollup
table. We store downloaded rollups for several reasons:For rollup processing, we implemented a finite state machine (FSM) with the following states: ReceivedTransaction, SuccessfullyDownload, SuccessfullyValidate, StoredUpdate, and FailedToPersist. StoredUpdate and FailedToPersist are the final states, representing successful and unsuccessful processing cases, respectively. If we encounter a FailedToPersist state, the
rollup_fail_status
column in therollup_to_verify
table will store an enum representing the reason for the failure. Possible values of this enum include: ChecksumVerifyFailed (hash calculated during processing and hash received from the transaction are different), DownloadFailed, FileSerialization (invalid JSON), and RollupVerifyFailed (invalid tree root, leaf pubkey, etc.).The initial state for a rollup is ReceivedTransaction. If the rollup is already stored in the database, we retrieve it, cast it to the Rollup structure in the code, and move to the next state. Otherwise, we need to download it using the URL received from the transaction.
The next state is SuccessfullyDownload. Here, we need to validate that the rollup contains a valid tree. For this purpose, we use the ITree trait, which abstracts ConcurrentMerkleTree<DEPTH, BUF_SIZE> regardless of the const generic parameters. This abstraction is necessary for working comfortably with ConcurrentMerkleTree without causing stack overflow (detailed reasons are described in the code comments in the merkle_tree_wrapper.rs file). If validation completes successfully, the rollup transitions to the SuccessfullyValidate state.
If the rollup is SuccessfullyValidate, we can process all the assets inside it by iterating over them and calling the mint_v1 instruction handler. Once all assets are processed, the rollup transitions to the final StoredUpdate state. If a failure occurs at any step, the rollup will enter the FailedToPersist state. Any step may be retried.
The persist_rollups process runs in a single worker in the nft_ingester/src/main.rs file. While we can run it with multiple workers, it can be very RAM-intensive (a single rollup may be many GB in size).