Skip to content

Commit

Permalink
Helius redeem txn (#95)
Browse files Browse the repository at this point in the history
* fix(ingester): handle redeem txns

* more important conflict fixes

---------

Co-authored-by: Nicolas Pennie <[email protected]>
  • Loading branch information
linuskendall and NicolasPennie authored Jul 27, 2023
1 parent fec8d0f commit b253aab
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 64 deletions.
14 changes: 3 additions & 11 deletions nft_ingester/src/program_transformers/bubblegum/burn.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{save_changelog_event, update_asset};
use crate::error::IngesterError;
use super::{save_changelog_event, update_compressed_asset};
use crate::{error::IngesterError, program_transformers::bubblegum::u32_to_u8_array};
use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction};
use digital_asset_types::dao::asset;
use log::debug;
Expand Down Expand Up @@ -35,18 +35,10 @@ where
};
// Don't send sequence number with this update, because we will always
// run this update even if it's from a backfill/replay.
update_asset(txn, id_bytes, None, asset_to_update).await?;
update_compressed_asset(txn, id_bytes, None, asset_to_update).await?;
return Ok(());
}
Err(IngesterError::ParsingError(
"Ix not parsed correctly".to_string(),
))
}

// PDA lookup requires an 8-byte array.
fn u32_to_u8_array(value: u32) -> [u8; 8] {
let bytes: [u8; 4] = value.to_le_bytes();
let mut result: [u8; 8] = [0; 8];
result[..4].copy_from_slice(&bytes);
result
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{save_changelog_event, update_asset};
use super::{save_changelog_event, update_compressed_asset};
use crate::error::IngesterError;
use blockbuster::{
instruction::InstructionBundle,
Expand Down Expand Up @@ -39,8 +39,8 @@ where
seq: Set(seq as i64), // gummyroll seq
..Default::default()
};
update_asset(txn, id_bytes, Some(seq), asset_to_update).await
} // _ => Err(IngesterError::NotImplemented),
update_compressed_asset(txn, id_bytes, Some(seq), asset_to_update).await
}
};
}
Err(IngesterError::ParsingError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use blockbuster::{
use digital_asset_types::dao::{asset, asset_grouping};
use sea_orm::{entity::*, query::*, sea_query::OnConflict, DbBackend, Set, Unchanged};

use super::{save_changelog_event, update_asset};
use super::{save_changelog_event, update_compressed_asset};
use crate::error::IngesterError;
pub async fn process<'c, T>(
parsing_result: &BubblegumInstruction,
Expand All @@ -30,7 +30,7 @@ where
seq: Set(seq as i64),
..Default::default()
};
update_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?;
update_compressed_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?;

if verify {
if let Some(Payload::SetAndVerifyCollection { collection }) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ use mpl_bubblegum::{
};
use sea_orm::{ConnectionTrait, Set, TransactionTrait, Unchanged};

use crate::{
error::IngesterError,
program_transformers::bubblegum::{update_asset, update_creator},
};
use crate::{error::IngesterError, program_transformers::bubblegum::update_creator};

use super::save_changelog_event;
use super::{save_changelog_event, update_compressed_asset};

pub async fn process<'c, T>(
parsing_result: &BubblegumInstruction,
Expand Down Expand Up @@ -92,7 +89,7 @@ where
..Default::default()
};

update_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?;
update_compressed_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?;
id_bytes
} // _ => return Err(IngesterError::NotImplemented),
};
Expand Down
8 changes: 5 additions & 3 deletions nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ where
//TODO -> set maximum size of path and break into multiple statements
}

pub async fn update_asset<T>(
pub async fn update_compressed_asset<T>(
txn: &T,
id: Vec<u8>,
seq: Option<u64>,
Expand All @@ -141,7 +141,9 @@ where
asset::Entity::update(model).filter(
Condition::all()
.add(asset::Column::Id.eq(id.clone()))
.add(asset::Column::Seq.lte(seq)),
.add(asset::Column::Seq.lte(seq))
// Do not update once asset is decompressed
.add(asset::Column::Compressed.eq(true)),
)
} else {
asset::Entity::update(model).filter(asset::Column::Id.eq(id.clone()))
Expand All @@ -153,7 +155,7 @@ where
DbErr::RecordNotFound(ref s) => {
if s.contains("None of the database rows are affected") {
warn!(
"Update failed. No asset found for id {}.",
"Skipping update for asset {}. It either does not exist or has already been decompressed.",
bs58::encode(id).into_string()
);
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions nft_ingester/src/program_transformers/bubblegum/delegate.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{save_changelog_event, update_asset};
use super::{save_changelog_event, update_compressed_asset};
use crate::error::IngesterError;
use blockbuster::{
instruction::InstructionBundle,
Expand Down Expand Up @@ -39,8 +39,8 @@ where
seq: Set(seq as i64), // gummyroll seq
..Default::default()
};
update_asset(txn, id_bytes, Some(seq), asset_to_update).await
} // _ => Err(IngesterError::NotImplemented),
update_compressed_asset(txn, id_bytes, Some(seq), asset_to_update).await
}
};
}
Err(IngesterError::ParsingError(
Expand Down
8 changes: 8 additions & 0 deletions nft_ingester/src/program_transformers/bubblegum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,11 @@ where
}
Ok(())
}

// PDA lookup requires an 8-byte array.
fn u32_to_u8_array(value: u32) -> [u8; 8] {
let bytes: [u8; 4] = value.to_le_bytes();
let mut result: [u8; 8] = [0; 8];
result[..4].copy_from_slice(&bytes);
result
}
60 changes: 28 additions & 32 deletions nft_ingester/src/program_transformers/bubblegum/redeem.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::{error::IngesterError, program_transformers::bubblegum::update_asset};
use log::debug;
use sea_orm::entity::*;

use super::save_changelog_event;
use blockbuster::{
instruction::InstructionBundle,
programs::bubblegum::{BubblegumInstruction, LeafSchema},
};
use crate::{error::IngesterError, program_transformers::bubblegum::update_compressed_asset};

use super::{save_changelog_event, u32_to_u8_array};
use blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction};
use digital_asset_types::dao::asset;
use sea_orm::{entity::*, ConnectionTrait, TransactionTrait};
use sea_orm::{ConnectionTrait, TransactionTrait};
use solana_sdk::pubkey::Pubkey;

pub async fn redeem<'c, T>(
parsing_result: &BubblegumInstruction,
Expand All @@ -16,33 +17,28 @@ pub async fn redeem<'c, T>(
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
if let Some(cl) = &parsing_result.tree_update {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn).await?;
return match le.schema {
LeafSchema::V1 {
id,
delegate,
owner,
..
} => {
let id_bytes = id.to_bytes().to_vec();
let delegate = if owner == delegate {
None
} else {
Some(delegate.to_bytes().to_vec())
};
let owner_bytes = owner.to_bytes().to_vec();
let asset_to_update = asset::ActiveModel {
id: Unchanged(id_bytes.clone()),
leaf: Set(Some(vec![0; 32])),
delegate: Set(delegate),
owner: Set(Some(owner_bytes)),
seq: Set(seq as i64),
..Default::default()
};
update_asset(txn, id_bytes, Some(seq), asset_to_update).await
} // _ => Err(IngesterError::NotImplemented),
let leaf_index = cl.index;
let (asset_id, _) = Pubkey::find_program_address(
&[
"asset".as_bytes(),
cl.id.as_ref(),
u32_to_u8_array(leaf_index).as_ref(),
],
&mpl_bubblegum::ID,
);
debug!("Indexing redeem for asset id: {:?}", asset_id);
let id_bytes = asset_id.to_bytes().to_vec();
let asset_to_update = asset::ActiveModel {
id: Unchanged(id_bytes.clone()),
leaf: Set(Some(vec![0; 32])),
seq: Set(seq as i64),
..Default::default()
};

update_compressed_asset(txn, id_bytes, Some(seq as u64), asset_to_update).await?;
return Ok(());
}
Err(IngesterError::ParsingError(
"Ix not parsed correctly".to_string(),
Expand Down
8 changes: 4 additions & 4 deletions nft_ingester/src/program_transformers/bubblegum/transfer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{error::IngesterError, program_transformers::bubblegum::update_asset};
use crate::error::IngesterError;

use super::save_changelog_event;
use super::{save_changelog_event, update_compressed_asset};
use blockbuster::{
instruction::InstructionBundle,
programs::bubblegum::{BubblegumInstruction, LeafSchema},
Expand Down Expand Up @@ -40,8 +40,8 @@ where
seq: Set(seq as i64), // gummyroll seq
..Default::default()
};
update_asset(txn, id_bytes, Some(seq), asset_to_update).await
} // _ => Err(IngesterError::NotImplemented),
update_compressed_asset(txn, id_bytes, Some(seq), asset_to_update).await
}
};
}
Err(IngesterError::ParsingError(
Expand Down
64 changes: 64 additions & 0 deletions tools/txn_forwarder/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Transaction Forwarder

## Send single transaction locally

```
cargo run -- \
--redis-url 'redis://localhost:6379' \
--rpc-url $RPC_URL \
--max-retries 10 \
--concurrency 10 \
single --txn 3dzRYn994xzgUeomzczdkMAhicjPZKXkKFSvYKjQpT2Lbnv1pG3DG7de8sVomUMFX9Y3Fquz194jtvVH2sRWMAX7
```

## Send single transaction to Dev/Prod

```
cargo run -- \
--redis-url $REDIS_URL \
--rpc-url $RPC_URL \
--max-retries 10 \
--concurrency 10 \
single --txn 2HzRtBYKPxRn17LJwYHUNALPJfLRwxwLsdZzfToCD2UoYgw5TCGcSEwkHTvVgrv4s6b9v9hr5tpV2tSjWGa76AWd
```

## Backfill tree locally

```
cargo run -- \
--redis-url 'redis://localhost:6379' \
--rpc-url $RPC_URL \
--max-retries 10 \
--concurrency 10 \
address --address Cu61XHSkbasbvBc3atv5NUMz6C8FYmocNkH7mtjLFjR7
```

## Backfill tree against Dev/Prod

```
cargo run -- \
--redis-url $REDIS_URL \
--rpc-url $RPC_URL \
--max-retries 10 \
--concurrency 3 \
address --address GAnNkHUWwcC4s4jFgbPT491KtvVRuGBYefZ7Qahcmpqy
```

If you want to run against a range, you can use the `before` and/or `after` parameters. Example:

```
cargo run -- \
--redis-url $REDIS_URL \
--rpc-url $RPC_URL \
--max-retries 10 \
--concurrency 5 \
--after 'SwxK31AXJwCXkHkWSFA4f1u1V9jYgcwr8oDbeqEcNhJEVEgzEmgLVEsQ1pyLeTgzPAHDTmrHfcu5q9QSaAnwPs4' \
--before '5MUNFgEiFU5FwjasHroDK328Vrj4QAcf2zEK8FKVCBVKpLTynUCaTWNxJDyGMeAemij4qEAYXujVDRHT6WjzJxEX' \
--replay-forward \
address --address GAnNkHUWwcC4s4jFgbPT491KtvVRuGBYefZ7Qahcmpqy
```

This will push all transactions that are newer than `4DbGBhhcNRar1tL12VWciqAGUsZNaeom9iuWDbza7cE4d3VR9BbD5wkbnu44b4sDkjiqT14nPCxVLzRAqtjhkkWj`.

If we want to ensure the transactions are sent in an order we want, ensure that concurrency is set to 1. When there's concurrency, the ordering is
not guranteed.

0 comments on commit b253aab

Please sign in to comment.