Skip to content

Commit

Permalink
ingester
Browse files Browse the repository at this point in the history
  • Loading branch information
tahsintunan committed Jan 4, 2024
1 parent 8b23eb4 commit c421351
Show file tree
Hide file tree
Showing 16 changed files with 82 additions and 106 deletions.
8 changes: 6 additions & 2 deletions nft_ingester/src/account_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub fn account_worker<T: Messenger>(
tokio::spawn(async move {
let source = T::new(config).await;
if let Ok(mut msg) = source {
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender, false));
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender));
loop {
let e = msg.recv(stream_key, consumption_type.clone()).await;
let mut tasks = JoinSet::new();
Expand Down Expand Up @@ -65,7 +65,11 @@ pub fn account_worker<T: Messenger>(
})
}

async fn handle_account(manager: Arc<ProgramTransformer>, item: RecvData, stream_key: &'static str) -> Option<String> {
async fn handle_account(
manager: Arc<ProgramTransformer>,
item: RecvData,
stream_key: &'static str,
) -> Option<String> {
let id = item.id;
let mut ret_id = None;
let data = item.data;
Expand Down
1 change: 0 additions & 1 deletion nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub struct IngesterConfig {
pub transaction_backfill_stream_worker_count: Option<u32>,
pub code_version: Option<&'static str>,
pub background_task_runner_config: Option<BackgroundTaskRunnerConfig>,
pub cl_audits: Option<bool>, // save transaction logs for compressed nfts
}

impl IngesterConfig {
Expand Down
6 changes: 2 additions & 4 deletions nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use chrono::Duration;
use clap::{arg, command, value_parser};
use log::{error, info};
use plerkle_messenger::{
redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, ACCOUNT_BACKFILL_STREAM, TRANSACTION_STREAM, TRANSACTION_BACKFILL_STREAM
redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_BACKFILL_STREAM, ACCOUNT_STREAM,
TRANSACTION_BACKFILL_STREAM, TRANSACTION_STREAM,
};
use std::{path::PathBuf, time};
use tokio::{signal, task::JoinSet};
Expand Down Expand Up @@ -118,7 +119,6 @@ pub async fn main() -> Result<(), IngesterError> {
TRANSACTION_BACKFILL_STREAM,
)?;


if let Some(t) = timer_acc.start::<RedisMessenger>().await {
tasks.spawn(t);
}
Expand Down Expand Up @@ -175,7 +175,6 @@ pub async fn main() -> Result<(), IngesterError> {
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
TRANSACTION_STREAM,
);
}
Expand All @@ -190,7 +189,6 @@ pub async fn main() -> Result<(), IngesterError> {
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
TRANSACTION_BACKFILL_STREAM,
);
}
Expand Down
11 changes: 3 additions & 8 deletions nft_ingester/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use cadence_macros::{is_global_default_set, set_global_default, statsd_count, st
use log::{error, warn};
use tokio::time::Instant;

use crate::{
config::IngesterConfig,
error::IngesterError,
};
use crate::{config::IngesterConfig, error::IngesterError};

#[macro_export]
macro_rules! metric {
Expand All @@ -32,17 +29,15 @@ pub fn setup_metrics(config: &IngesterConfig) {
let udp_sink = BufferedUdpMetricSink::from(host, socket).unwrap();
let queuing_sink = QueuingMetricSink::from(udp_sink);
let builder = StatsdClient::builder("das_ingester", queuing_sink);
let client = builder
.with_tag("env", env)
.build();
let client = builder.with_tag("env", env).build();
set_global_default(client);
}
}

// Returns a boolean indicating whether the redis message should be ACK'd.
// If the message is not ACK'd, it will be retried as long as it is under the retry limit.
pub fn capture_result(
id: String,
_id: String,
stream: &str,
label: (&str, &str),
tries: usize,
Expand Down
4 changes: 2 additions & 2 deletions nft_ingester/src/program_transformers/bubblegum/burn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ pub async fn burn<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
instruction: &str,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let Some(cl) = &parsing_result.tree_update {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
let leaf_index = cl.index;
let (asset_id, _) = Pubkey::find_program_address(
&[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ pub async fn cancel_redeem<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
instruction: &str,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
#[allow(unreachable_patterns)]
return match le.schema {
LeafSchema::V1 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub async fn process<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
instruction: &str,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -37,7 +37,7 @@ where
"Handling collection verification event for {} (verify: {}): {}",
collection, verify, bundle.txn_id
);
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
let id_bytes = match le.schema {
LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub async fn process<'c, T>(
bundle: &InstructionBundle<'c>,
txn: &'c T,
value: bool,
cl_audits: bool,
instruction: &str,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -41,7 +41,7 @@ where
"Handling creator verification event for creator {} (verify: {}): {}",
creator, verify, bundle.txn_id
);
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;

let asset_id_bytes = match le.schema {
LeafSchema::V1 {
Expand Down
93 changes: 38 additions & 55 deletions nft_ingester/src/program_transformers/bubblegum/db.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
use crate::error::IngesterError;
use digital_asset_types::dao::{
asset, asset_creators, asset_grouping, backfill_items, cl_audits, cl_items,
asset, asset_creators, asset_grouping, cl_audits_v2, cl_items,
sea_orm_active_enums::Instruction,
};
use log::{debug, info};
use log::{debug, error};
use mpl_bubblegum::types::Collection;
use sea_orm::{
query::*, sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DbBackend, EntityTrait,
};
use sea_orm::{query::*, sea_query::OnConflict, ActiveValue::Set, DbBackend, EntityTrait};
use spl_account_compression::events::ChangeLogEventV1;

use std::convert::From;

pub async fn save_changelog_event<'c, T>(
change_log_event: &ChangeLogEventV1,
slot: u64,
txn_id: &str,
txn: &T,
cl_audits: bool,
instruction: &str,
) -> Result<u64, IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
insert_change_log(change_log_event, slot, txn_id, txn, cl_audits).await?;
insert_change_log(change_log_event, slot, txn_id, txn, instruction).await?;
Ok(change_log_event.seq)
}

Expand All @@ -31,10 +28,10 @@ fn node_idx_to_leaf_idx(index: i64, tree_height: u32) -> i64 {

pub async fn insert_change_log<'c, T>(
change_log_event: &ChangeLogEventV1,
slot: u64,
_slot: u64,
txn_id: &str,
txn: &T,
cl_audits: bool,
instruction: &str,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -45,12 +42,13 @@ where
for p in change_log_event.path.iter() {
let node_idx = p.index as i64;
debug!(
"seq {}, index {} level {}, node {:?}, txn: {:?}",
"seq {}, index {} level {}, node {:?}, txn: {:?}, instruction {}",
change_log_event.seq,
p.index,
i,
bs58::encode(p.node).into_string(),
txn_id,
instruction
);
let leaf_idx = if i == 0 {
Some(node_idx_to_leaf_idx(node_idx, depth as u32))
Expand All @@ -68,14 +66,6 @@ where
..Default::default()
};

let mut audit_item: Option<cl_audits::ActiveModel> = if (cl_audits) {
let mut ai: cl_audits::ActiveModel = item.clone().into();
ai.tx = Set(txn_id.to_string());
Some(ai)
} else {
None
};

i += 1;
let mut query = cl_items::Entity::insert(item)
.on_conflict(
Expand All @@ -93,46 +83,39 @@ where
txn.execute(query)
.await
.map_err(|db_err| IngesterError::StorageWriteError(db_err.to_string()))?;

// Insert the audit item after the insert into cl_items have been completed
if let Some(audit_item) = audit_item {
cl_audits::Entity::insert(audit_item).exec(txn).await?;
}
}

// If and only if the entire path of nodes was inserted into the `cl_items` table, then insert
// a single row into the `backfill_items` table. This way if an incomplete path was inserted
// into `cl_items` due to an error, a gap will be created for the tree and the backfiller will
// fix it.
if i - 1 == depth as i64 {
// See if the tree already exists in the `backfill_items` table.
let rows = backfill_items::Entity::find()
.filter(backfill_items::Column::Tree.eq(tree_id))
.limit(1)
.all(txn)
.await?;

// If the tree does not exist in `backfill_items` and the sequence number is greater than 1,
// then we know we will need to backfill the tree from sequence number 1 up to the current
// sequence number. So in this case we set at flag to force checking the tree.
let force_chk = rows.is_empty() && change_log_event.seq > 1;

info!("Adding to backfill_items table at level {}", i - 1);
let item = backfill_items::ActiveModel {
tree: Set(tree_id.to_vec()),
seq: Set(change_log_event.seq as i64),
slot: Set(slot as i64),
force_chk: Set(force_chk),
backfilled: Set(false),
failed: Set(false),
..Default::default()
};

backfill_items::Entity::insert(item).exec(txn).await?;
// Insert the audit item after the insert into cl_items have been completed
let tx_id_bytes = bs58::decode(txn_id)
.into_vec()
.map_err(|_e| IngesterError::ChangeLogEventMalformed)?;
let audit_item_v2 = cl_audits_v2::ActiveModel {
tree: Set(tree_id.to_vec()),
leaf_idx: Set(change_log_event.index as i64),
seq: Set(change_log_event.seq as i64),
tx: Set(tx_id_bytes),
instruction: Set(Instruction::from_str(instruction)),
..Default::default()
};
let query = cl_audits_v2::Entity::insert(audit_item_v2)
.on_conflict(
OnConflict::columns([
cl_audits_v2::Column::Tree,
cl_audits_v2::Column::LeafIdx,
cl_audits_v2::Column::Seq,
])
.do_nothing()
.to_owned(),
)
.build(DbBackend::Postgres);
match txn.execute(query).await {
Ok(_) => {}
Err(e) => {
error!("Error while inserting into cl_audits_v2: {:?}", e);
}
}

Ok(())
//TODO -> set maximum size of path and break into multiple statements
}

pub async fn upsert_asset_with_leaf_info<T>(
Expand Down
4 changes: 2 additions & 2 deletions nft_ingester/src/program_transformers/bubblegum/delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ pub async fn delegate<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
instruction: &str,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
return match le.schema {
LeafSchema::V1 {
id,
Expand Down
4 changes: 2 additions & 2 deletions nft_ingester/src/program_transformers/bubblegum/mint_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub async fn mint_v1<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
cl_audits: bool,
instruction: &str,
) -> Result<Option<TaskData>, IngesterError>
where
T: ConnectionTrait + TransactionTrait,
Expand All @@ -49,7 +49,7 @@ where
&parsing_result.tree_update,
&parsing_result.payload,
) {
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, cl_audits).await?;
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
let metadata = args;
#[allow(unreachable_patterns)]
return match le.schema {
Expand Down
Loading

0 comments on commit c421351

Please sign in to comment.