Skip to content

Commit

Permalink
Make cl audits configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
linuskendall committed Sep 19, 2023
1 parent 642c98c commit e5d5ecc
Show file tree
Hide file tree
Showing 15 changed files with 48 additions and 25 deletions.
2 changes: 1 addition & 1 deletion nft_ingester/src/account_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn account_worker<T: Messenger>(
tokio::spawn(async move {
let source = T::new(config).await;
if let Ok(mut msg) = source {
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender));
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender, false));
loop {
let e = msg.recv(ACCOUNT_STREAM, consumption_type.clone()).await;
let mut tasks = JoinSet::new();
Expand Down
4 changes: 3 additions & 1 deletion nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct IngesterConfig {
pub transaction_stream_worker_count: Option<u32>,
pub code_version: Option<&'static str>,
pub background_task_runner_config: Option<BackgroundTaskRunnerConfig>,
pub cl_audits: Option<bool>, // save transaction logs for compressed nfts
}

impl IngesterConfig {
Expand Down Expand Up @@ -124,7 +125,8 @@ pub fn setup_config(config_file: Option<&PathBuf>) -> IngesterConfig {
figment = figment.join(Yaml::file(config_file));
}

let mut config: IngesterConfig = figment
let mut config: IngesterConfig =
figment
.extract()
.map_err(|config_error| IngesterError::ConfigurationError {
msg: format!("{}", config_error),
Expand Down
1 change: 1 addition & 0 deletions nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub async fn main() -> Result<(), IngesterError> {
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
);
}
}
Expand Down
3 changes: 2 additions & 1 deletion nft_ingester/src/program_transformers/bubblegum/burn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ pub async fn burn<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let Some(cl) = &parsing_result.tree_update {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let leaf_index = cl.index;
let (asset_id, _) = Pubkey::find_program_address(
&[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ pub async fn cancel_redeem<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
#[allow(unreachable_patterns)]
return match le.schema {
LeafSchema::V1 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub async fn process<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -36,7 +37,7 @@ where
"Handling collection verification event for {} (verify: {}): {}",
collection, verify, bundle.txn_id
);
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let id_bytes = match le.schema {
LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub async fn process<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
value: bool,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -40,7 +41,7 @@ where
"Handling creator verification event for creator {} (verify: {}): {}",
creator, verify, bundle.txn_id
);
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;

let asset_id_bytes = match le.schema {
LeafSchema::V1 {
Expand Down
15 changes: 11 additions & 4 deletions nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ pub async fn save_changelog_event<'c, T>(
slot: u64,
txn_id: &str,
txn: &T,
cl_audits: bool,
) -> Result<u64, IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
insert_change_log(change_log_event, slot, txn_id, txn).await?;
insert_change_log(change_log_event, slot, txn_id, txn, cl_audits).await?;
Ok(change_log_event.seq)
}

Expand All @@ -31,6 +32,7 @@ pub async fn insert_change_log<'c, T>(
slot: u64,
txn_id: &str,
txn: &T,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand Down Expand Up @@ -65,8 +67,11 @@ where
..Default::default()
};

let mut audit_item : cl_audits::ActiveModel = item.clone().into();
audit_item.tx = Set(txn_id.to_string());
let mut audit_item : Option<cl_audits::ActiveModel> = if(cl_audits) {
let mut ai : cl_audits::ActiveModel = item.clone().into();
ai.tx = Set(txn_id.to_string());
Some(ai)
} else { None };

i += 1;
let mut query = cl_items::Entity::insert(item)
Expand All @@ -88,7 +93,9 @@ where


// Insert the audit item after the insert into cl_items have been completed
cl_audits::Entity::insert(audit_item).exec(txn).await?;
if let Some(audit_item) = audit_item {
cl_audits::Entity::insert(audit_item).exec(txn).await?;
}
}

// If and only if the entire path of nodes was inserted into the `cl_items` table, then insert
Expand Down
3 changes: 2 additions & 1 deletion nft_ingester/src/program_transformers/bubblegum/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ pub async fn delegate<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
return match le.schema {
LeafSchema::V1 {
id,
Expand Down
3 changes: 2 additions & 1 deletion nft_ingester/src/program_transformers/bubblegum/mint_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub async fn mint_v1<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
) -> Result<TaskData, IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -47,7 +48,7 @@ where
&parsing_result.tree_update,
&parsing_result.payload,
) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let metadata = args;
#[allow(unreachable_patterns)]
return match le.schema {
Expand Down
19 changes: 10 additions & 9 deletions nft_ingester/src/program_transformers/bubblegum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub async fn handle_bubblegum_instruction<'c, T>(
bundle: &'c InstructionBundle<'c>,
txn: &T,
task_manager: &UnboundedSender<TaskData>,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand Down Expand Up @@ -57,38 +58,38 @@ where

match ix_type {
InstructionName::Transfer => {
transfer::transfer(parsing_result, bundle, txn).await?;
transfer::transfer(parsing_result, bundle, txn, cl_audits).await?;
}
InstructionName::Burn => {
burn::burn(parsing_result, bundle, txn).await?;
burn::burn(parsing_result, bundle, txn, cl_audits).await?;
}
InstructionName::Delegate => {
delegate::delegate(parsing_result, bundle, txn).await?;
delegate::delegate(parsing_result, bundle, txn, cl_audits).await?;
}
InstructionName::MintV1 | InstructionName::MintToCollectionV1 => {
let task = mint_v1::mint_v1(parsing_result, bundle, txn).await?;
let task = mint_v1::mint_v1(parsing_result, bundle, txn, cl_audits).await?;

task_manager.send(task)?;
}
InstructionName::Redeem => {
redeem::redeem(parsing_result, bundle, txn).await?;
redeem::redeem(parsing_result, bundle, txn, cl_audits).await?;
}
InstructionName::CancelRedeem => {
cancel_redeem::cancel_redeem(parsing_result, bundle, txn).await?;
cancel_redeem::cancel_redeem(parsing_result, bundle, txn, cl_audits).await?;
}
InstructionName::DecompressV1 => {
decompress::decompress(parsing_result, bundle, txn).await?;
}
InstructionName::VerifyCreator => {
creator_verification::process(parsing_result, bundle, txn, true).await?;
creator_verification::process(parsing_result, bundle, txn, true, cl_audits).await?;
}
InstructionName::UnverifyCreator => {
creator_verification::process(parsing_result, bundle, txn, false).await?;
creator_verification::process(parsing_result, bundle, txn, false, cl_audits).await?;
}
InstructionName::VerifyCollection
| InstructionName::UnverifyCollection
| InstructionName::SetAndVerifyCollection => {
collection_verification::process(parsing_result, bundle, txn).await?;
collection_verification::process(parsing_result, bundle, txn, cl_audits).await?;
}
_ => debug!("Bubblegum: Not Implemented Instruction"),
}
Expand Down
3 changes: 2 additions & 1 deletion nft_ingester/src/program_transformers/bubblegum/redeem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ pub async fn redeem<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let Some(cl) = &parsing_result.tree_update {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let leaf_index = cl.index;
let (asset_id, _) = Pubkey::find_program_address(
&[
Expand Down
3 changes: 2 additions & 1 deletion nft_ingester/src/program_transformers/bubblegum/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ pub async fn transfer<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
#[allow(unreachable_patterns)]
return match le.schema {
LeafSchema::V1 {
Expand Down
5 changes: 4 additions & 1 deletion nft_ingester/src/program_transformers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ pub struct ProgramTransformer {
task_sender: UnboundedSender<TaskData>,
matchers: HashMap<Pubkey, Box<dyn ProgramParser>>,
key_set: HashSet<Pubkey>,
cl_audits: bool,
}

impl ProgramTransformer {
pub fn new(pool: PgPool, task_sender: UnboundedSender<TaskData>) -> Self {
pub fn new(pool: PgPool, task_sender: UnboundedSender<TaskData>, cl_audits: bool) -> Self {
let mut matchers: HashMap<Pubkey, Box<dyn ProgramParser>> = HashMap::with_capacity(1);
let bgum = BubblegumParser {};
let token_metadata = TokenMetadataParser {};
Expand All @@ -50,6 +51,7 @@ impl ProgramTransformer {
task_sender,
matchers,
key_set: hs,
cl_audits: cl_audits,
}
}

Expand Down Expand Up @@ -125,6 +127,7 @@ impl ProgramTransformer {
&ix,
&self.storage,
&self.task_sender,
self.cl_audits,
)
.await
.map_err(|err| {
Expand Down
3 changes: 2 additions & 1 deletion nft_ingester/src/transaction_notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ pub fn transaction_worker<T: Messenger>(
bg_task_sender: UnboundedSender<TaskData>,
ack_channel: UnboundedSender<(&'static str, String)>,
consumption_type: ConsumptionType,
cl_audits: bool,
) -> JoinHandle<()> {
tokio::spawn(async move {
let source = T::new(config).await;
if let Ok(mut msg) = source {
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender));
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender, cl_audits));
loop {
let e = msg.recv(TRANSACTION_STREAM, consumption_type.clone()).await;
let mut tasks = JoinSet::new();
Expand Down

0 comments on commit e5d5ecc

Please sign in to comment.