From 75286332ca394c11e1256ac64bf86045f0d78fef Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Thu, 30 May 2024 14:15:55 +0200 Subject: [PATCH] refactor: move bubble backfill to a lib so can be used by other projects eg LightDAS and be included in the grpc-ingest to execute backfills on restart then pick up tasks from the stream to insure no out of order indexing --- Cargo.lock | 67 ++- Cargo.toml | 27 +- bubblegum-backfill/Cargo.toml | 40 ++ bubblegum-backfill/src/error.rs | 19 + bubblegum-backfill/src/gap.rs | 140 ++++++ bubblegum-backfill/src/lib.rs | 69 +++ bubblegum-backfill/src/tree.rs | 118 +++++ bubblegum-backfill/src/worker/gap.rs | 65 +++ bubblegum-backfill/src/worker/mod.rs | 9 + .../src/worker/program_transformer.rs | 50 ++ bubblegum-backfill/src/worker/transaction.rs | 210 +++++++++ bubblegum-backfill/src/worker/tree.rs | 114 +++++ core/Cargo.toml | 28 +- core/src/lib.rs | 2 + core/src/metadata_json.rs | 223 +++++++++ nft_ingester/Cargo.toml | 1 + nft_ingester/src/tasks/common/mod.rs | 2 +- ops/Cargo.toml | 2 +- ops/src/account/account_info.rs | 2 +- ops/src/bubblegum/README.md | 60 +-- ops/src/bubblegum/audit.rs | 89 ---- ops/src/bubblegum/backfiller.rs | 443 +----------------- ops/src/bubblegum/cmd.rs | 29 +- ops/src/bubblegum/mod.rs | 2 - ops/src/bubblegum/tree.rs | 260 ---------- program_transformers/Cargo.toml | 6 +- program_transformers/src/lib.rs | 29 +- 27 files changed, 1189 insertions(+), 917 deletions(-) create mode 100644 bubblegum-backfill/Cargo.toml create mode 100644 bubblegum-backfill/src/error.rs create mode 100644 bubblegum-backfill/src/gap.rs create mode 100644 bubblegum-backfill/src/lib.rs create mode 100644 bubblegum-backfill/src/tree.rs create mode 100644 bubblegum-backfill/src/worker/gap.rs create mode 100644 bubblegum-backfill/src/worker/mod.rs create mode 100644 bubblegum-backfill/src/worker/program_transformer.rs create mode 100644 bubblegum-backfill/src/worker/transaction.rs create mode 100644 bubblegum-backfill/src/worker/tree.rs create mode 100644 core/src/metadata_json.rs delete mode 100644 ops/src/bubblegum/audit.rs delete mode 100644 ops/src/bubblegum/tree.rs diff --git a/Cargo.lock b/Cargo.lock index d7de6d4b8..7629dca29 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1524,6 +1524,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + [[package]] name = "core-foundation" version = "0.9.3" @@ -1710,24 +1716,68 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "das-bubblegum-backfill" +version = "0.7.2" +dependencies = [ + "anchor-client", + "anyhow", + "blockbuster", + "borsh 0.10.3", + "bs58 0.4.0", + "clap 4.4.8", + "das-core", + "digital_asset_types", + "futures", + "heck 0.5.0", + "log", + "mpl-bubblegum", + "num-traits", + "program_transformers", + "sea-orm", + "serde_json", + "solana-client", + "solana-program", + "solana-sdk", + "solana-transaction-status", + "spl-account-compression", + "spl-token", + "sqlx", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "das-core" version = "0.7.2" dependencies = [ "anyhow", "backon", + "borsh 0.10.3", + "bs58 0.4.0", "cadence", "cadence-macros", "clap 4.4.8", + "derive_more", + "digital_asset_types", "figment", + "futures", + "indicatif", + "log", "plerkle_messenger", + "reqwest", + "sea-orm", + "serde_json", "solana-account-decoder", "solana-client", "solana-sdk", "solana-transaction-status", + "spl-account-compression", "sqlx", "thiserror", "tokio", + "url", ] [[package]] @@ -1736,12 +1786,12 @@ version = "0.7.2" dependencies = [ "anchor-client", "anyhow", - "backon", "borsh 0.10.3", "bs58 0.4.0", "cadence", "cadence-macros", "clap 4.4.8", + "das-bubblegum-backfill", "das-core", "digital_asset_types", "env_logger 0.10.0", @@ -1877,6 +1927,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_more" +version = "0.99.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" +dependencies = [ + "convert_case", + "proc-macro2", + "quote", + "rustc_version", + "syn 1.0.109", +] + [[package]] name = "deunicode" version = "1.4.3" @@ -3544,6 +3607,7 @@ dependencies = [ "cadence-macros", "chrono", "clap 4.4.8", + "das-core", "digital_asset_types", "figment", "flatbuffers", @@ -4276,6 +4340,7 @@ dependencies = [ "bs58 0.4.0", "cadence", "cadence-macros", + "das-core", "digital_asset_types", "futures", "heck 0.5.0", diff --git a/Cargo.toml b/Cargo.toml index 98d55ecff..74384755e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "blockbuster", + "bubblegum-backfill", "core", "das_api", "digital_asset_types", @@ -32,18 +33,20 @@ anyhow = "1.0.75" async-std = "1.0.0" async-trait = "0.1.60" backon = "0.4.1" -blockbuster = {path = "blockbuster"} +blockbuster = { path = "blockbuster" } borsh = "~0.10.3" borsh-derive = "~0.10.3" bs58 = "0.4.0" -bytemuck = {version = "1.14.0", features = ["derive"]} +bytemuck = { version = "1.14.0", features = ["derive"] } cadence = "0.29.0" cadence-macros = "0.29.0" chrono = "0.4.19" clap = "4.2.2" -das-core = {path = "core"} -das_api = {path = "das_api"} -digital_asset_types = {path = "digital_asset_types"} +das-core = { path = "core" } +das-bubblegum-backfill = { path = "bubblegum-backfill" } +das_api = { path = "das_api" } +derive_more = { version = "0.99.17" } +digital_asset_types = { path = "digital_asset_types" } enum-iterator = "1.2.0" enum-iterator-derive = "1.1.0" env_logger = "0.10.0" @@ -65,12 +68,12 @@ jsonrpsee-core = "0.16.2" lazy_static = "1.4.0" log = "0.4.17" metrics = "0.20.1" -migration = {path = "migration"} +migration = { path = "migration" } mime_guess = "2.0.4" mpl-bubblegum = "1.2.0" -mpl-core = {version = "0.7.0", features = ["serde"]} +mpl-core = { version = "0.7.0", features = ["serde"] } mpl-token-metadata = "4.1.1" -nft_ingester = {path = "nft_ingester"} +nft_ingester = { path = "nft_ingester" } num-derive = "0.3.3" num-traits = "0.2.15" once_cell = "1.19.0" @@ -78,7 +81,7 @@ open-rpc-derive = "0.0.4" open-rpc-schema = "0.0.4" plerkle_messenger = "1.6.0" plerkle_serialization = "1.8.0" -program_transformers = {path = "program_transformers"} +program_transformers = { path = "program_transformers" } prometheus = "0.13.3" proxy-wasm = "0.2.0" rand = "0.8.5" @@ -104,9 +107,9 @@ spl-account-compression = "0.3.0" spl-associated-token-account = ">= 1.1.3, < 3.0" spl-concurrent-merkle-tree = "0.2.0" spl-noop = "0.2.0" -spl-pod = {version = "0.1.0", features = ["serde-traits"]} +spl-pod = { version = "0.1.0", features = ["serde-traits"] } spl-token = ">= 3.5.0, < 5.0" -spl-token-2022 = {version = "1.0", features = ["no-entrypoint"]} +spl-token-2022 = { version = "1.0", features = ["no-entrypoint"] } spl-token-group-interface = "0.1.0" spl-token-metadata-interface = "0.2.0" sqlx = "0.6.2" @@ -118,7 +121,7 @@ tower = "0.4.13" tower-http = "0.3.5" tracing = "0.1.35" tracing-subscriber = "0.3.16" -txn_forwarder = {path = "tools/txn_forwarder"} +txn_forwarder = { path = "tools/txn_forwarder" } url = "2.3.1" wasi = "0.7.0" wasm-bindgen = "0.2.83" diff --git a/bubblegum-backfill/Cargo.toml b/bubblegum-backfill/Cargo.toml new file mode 100644 index 000000000..8b49f7551 --- /dev/null +++ b/bubblegum-backfill/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "das-bubblegum-backfill" +version = { workspace = true } +edition = { workspace = true } +repository = { workspace = true } +publish = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +blockbuster = { workspace = true } +bs58 = { workspace = true } +das-core = { workspace = true } +solana-client = { workspace = true } +borsh = { workspace = true } +digital_asset_types = { workspace = true, features = [ + "json_types", + "sql_types", +] } +anchor-client = { workspace = true } +futures = { workspace = true } +clap = { workspace = true } +log = { workspace = true } +solana-program = { workspace = true } +program_transformers = { workspace = true } +heck = { workspace = true } +mpl-bubblegum = { workspace = true } +num-traits = { workspace = true } +sea-orm = { workspace = true } +serde_json = { workspace = true } +solana-sdk = { workspace = true } +solana-transaction-status = { workspace = true } +spl-account-compression = { workspace = true, features = ["no-entrypoint"] } +spl-token = { workspace = true, features = ["no-entrypoint"] } +sqlx = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["time"] } +tracing = { workspace = true } + +[lints] +workspace = true diff --git a/bubblegum-backfill/src/error.rs b/bubblegum-backfill/src/error.rs new file mode 100644 index 000000000..420a15a52 --- /dev/null +++ b/bubblegum-backfill/src/error.rs @@ -0,0 +1,19 @@ +#[derive(Debug, thiserror::Error)] +pub enum ErrorKind { + #[error("anchor")] + Anchor(#[from] anchor_client::anchor_lang::error::Error), + #[error("solana rpc")] + Rpc(#[from] solana_client::client_error::ClientError), + #[error("parse pubkey")] + ParsePubkey(#[from] solana_sdk::pubkey::ParsePubkeyError), + #[error("serialize tree response")] + SerializeTreeResponse, + #[error("sea orm")] + Database(#[from] sea_orm::DbErr), + #[error("try from pubkey")] + TryFromPubkey, + #[error("try from signature")] + TryFromSignature, + #[error("generic error: {0}")] + Generic(String), +} diff --git a/bubblegum-backfill/src/gap.rs b/bubblegum-backfill/src/gap.rs new file mode 100644 index 000000000..feb523b98 --- /dev/null +++ b/bubblegum-backfill/src/gap.rs @@ -0,0 +1,140 @@ +use super::ErrorKind; +use crate::Rpc; +use anyhow::Result; +use clap::Args; +use sea_orm::{DatabaseConnection, DbBackend, FromQueryResult, Statement, Value}; +use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature; +use solana_sdk::{pubkey::Pubkey, signature::Signature}; +use std::str::FromStr; +use tokio::sync::mpsc::Sender; + +const GET_SIGNATURES_FOR_ADDRESS_LIMIT: usize = 1000; + +#[derive(Debug, Clone, Args)] +pub struct ConfigBackfiller { + /// Solana RPC URL + #[arg(long, env)] + pub solana_rpc_url: String, +} + +const TREE_GAP_SQL: &str = r#" +WITH sequenced_data AS ( + SELECT + tree, + seq, + LEAD(seq) OVER (ORDER BY seq ASC) AS next_seq, + tx AS current_tx, + LEAD(tx) OVER (ORDER BY seq ASC) AS next_tx + FROM + cl_audits_v2 + WHERE + tree = $1 +), +gaps AS ( + SELECT + tree, + seq AS gap_start_seq, + next_seq AS gap_end_seq, + current_tx AS lower_bound_tx, + next_tx AS upper_bound_tx + FROM + sequenced_data + WHERE + next_seq IS NOT NULL AND + next_seq - seq > 1 +) +SELECT + tree, + gap_start_seq, + gap_end_seq, + lower_bound_tx, + upper_bound_tx +FROM + gaps +ORDER BY + gap_start_seq; +"#; + +#[derive(Debug, FromQueryResult, PartialEq, Clone)] +pub struct TreeGapModel { + pub tree: Vec, + pub gap_start_seq: i64, + pub gap_end_seq: i64, + pub lower_bound_tx: Vec, + pub upper_bound_tx: Vec, +} + +impl TreeGapModel { + pub async fn find(conn: &DatabaseConnection, tree: Pubkey) -> Result, ErrorKind> { + let statement = Statement::from_sql_and_values( + DbBackend::Postgres, + TREE_GAP_SQL, + vec![Value::Bytes(Some(Box::new(tree.as_ref().to_vec())))], + ); + + TreeGapModel::find_by_statement(statement) + .all(conn) + .await + .map_err(Into::into) + } +} + +impl TryFrom for TreeGapFill { + type Error = ErrorKind; + + fn try_from(model: TreeGapModel) -> Result { + let tree = Pubkey::try_from(model.tree).map_err(|_| ErrorKind::TryFromPubkey)?; + let upper = + Signature::try_from(model.upper_bound_tx).map_err(|_| ErrorKind::TryFromSignature)?; + let lower = + Signature::try_from(model.lower_bound_tx).map_err(|_| ErrorKind::TryFromSignature)?; + + Ok(Self::new(tree, Some(upper), Some(lower))) + } +} + +pub struct TreeGapFill { + tree: Pubkey, + before: Option, + until: Option, +} + +impl TreeGapFill { + pub const fn new(tree: Pubkey, before: Option, until: Option) -> Self { + Self { + tree, + before, + until, + } + } + + pub async fn crawl(&self, client: Rpc, sender: Sender) -> Result<()> { + let mut before = self.before; + + loop { + let sigs = client + .get_signatures_for_address(&self.tree, before, self.until) + .await?; + let sig_count = sigs.len(); + + let successful_transactions = sigs + .into_iter() + .filter(|transaction| transaction.err.is_none()) + .collect::>(); + + for sig in successful_transactions.iter() { + let sig = Signature::from_str(&sig.signature)?; + + sender.send(sig).await?; + + before = Some(sig); + } + + if sig_count < GET_SIGNATURES_FOR_ADDRESS_LIMIT { + break; + } + } + + Ok(()) + } +} diff --git a/bubblegum-backfill/src/lib.rs b/bubblegum-backfill/src/lib.rs new file mode 100644 index 000000000..c6973f85b --- /dev/null +++ b/bubblegum-backfill/src/lib.rs @@ -0,0 +1,69 @@ +mod error; +mod gap; +mod tree; +mod worker; + +pub use error::ErrorKind; + +use anyhow::Result; +use clap::Parser; +use das_core::Rpc; +use futures::{stream::FuturesUnordered, StreamExt}; +use tree::TreeResponse; +use worker::TreeWorkerArgs; + +#[derive(Clone)] +pub struct BubblegumBackfillContext { + pub database_pool: sqlx::PgPool, + pub solana_rpc: Rpc, +} + +impl BubblegumBackfillContext { + pub const fn new(database_pool: sqlx::PgPool, solana_rpc: Rpc) -> Self { + Self { + database_pool, + solana_rpc, + } + } +} + +#[derive(Debug, Parser, Clone)] +pub struct BubblegumBackfillArgs { + /// Number of tree crawler workers + #[arg(long, env, default_value = "20")] + pub tree_crawler_count: usize, + + /// The list of trees to crawl. If not specified, all trees will be crawled. + #[arg(long, env, use_value_delimiter = true)] + pub only_trees: Option>, + + #[clap(flatten)] + pub tree_worker: TreeWorkerArgs, +} + +pub async fn start_bubblegum_backfill( + context: BubblegumBackfillContext, + args: BubblegumBackfillArgs, +) -> Result<()> { + let trees = if let Some(ref only_trees) = args.only_trees { + TreeResponse::find(&context.solana_rpc, only_trees.clone()).await? + } else { + TreeResponse::all(&context.solana_rpc).await? + }; + + let mut crawl_handles = FuturesUnordered::new(); + + for tree in trees { + if crawl_handles.len() >= args.tree_crawler_count { + crawl_handles.next().await; + } + let context = context.clone(); + let handle = args.tree_worker.start(context, tree); + + crawl_handles.push(handle); + } + + futures::future::try_join_all(crawl_handles).await?; + + Ok(()) +} diff --git a/bubblegum-backfill/src/tree.rs b/bubblegum-backfill/src/tree.rs new file mode 100644 index 000000000..efb5c1cbf --- /dev/null +++ b/bubblegum-backfill/src/tree.rs @@ -0,0 +1,118 @@ +use super::ErrorKind; +use anyhow::Result; +use borsh::BorshDeserialize; +use das_core::Rpc; +use solana_client::rpc_filter::{Memcmp, RpcFilterType}; +use solana_sdk::{account::Account, pubkey::Pubkey}; +use spl_account_compression::id; +use spl_account_compression::state::{ + merkle_tree_get_size, ConcurrentMerkleTreeHeader, CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1, +}; +use std::str::FromStr; + +#[derive(Debug, Clone)] +pub struct TreeHeaderResponse { + pub max_depth: u32, + pub max_buffer_size: u32, + pub creation_slot: u64, + pub size: usize, +} + +impl TryFrom for TreeHeaderResponse { + type Error = ErrorKind; + + fn try_from(payload: ConcurrentMerkleTreeHeader) -> Result { + let size = merkle_tree_get_size(&payload)?; + + Ok(Self { + max_depth: payload.get_max_depth(), + max_buffer_size: payload.get_max_buffer_size(), + creation_slot: payload.get_creation_slot(), + size, + }) + } +} + +#[derive(Debug, Clone)] +pub struct TreeResponse { + pub pubkey: Pubkey, + pub tree_header: TreeHeaderResponse, + pub seq: u64, +} + +impl TreeResponse { + pub fn try_from_rpc(pubkey: Pubkey, account: Account) -> Result { + let bytes = account.data.as_slice(); + + let (header_bytes, rest) = bytes.split_at(CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1); + let header: ConcurrentMerkleTreeHeader = + ConcurrentMerkleTreeHeader::try_from_slice(header_bytes)?; + + let merkle_tree_size = merkle_tree_get_size(&header)?; + let (tree_bytes, _canopy_bytes) = rest.split_at(merkle_tree_size); + + let seq_bytes = tree_bytes[0..8].try_into()?; + let seq = u64::from_le_bytes(seq_bytes); + + let (auth, _) = Pubkey::find_program_address(&[pubkey.as_ref()], &mpl_bubblegum::ID); + + header.assert_valid_authority(&auth)?; + + let tree_header = header.try_into()?; + + Ok(Self { + pubkey, + tree_header, + seq, + }) + } + + pub async fn all(client: &Rpc) -> Result, ErrorKind> { + Ok(client + .get_program_accounts( + &id(), + Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 0, + vec![1u8], + ))]), + ) + .await? + .into_iter() + .filter_map(|(pubkey, account)| Self::try_from_rpc(pubkey, account).ok()) + .collect()) + } + + pub async fn find(client: &Rpc, pubkeys: Vec) -> Result, ErrorKind> { + let pubkeys: Vec = pubkeys + .into_iter() + .map(|p| Pubkey::from_str(&p)) + .collect::, _>>()?; + let pubkey_batches = pubkeys.chunks(100); + let pubkey_batches_count = pubkey_batches.len(); + + let mut gma_handles = Vec::with_capacity(pubkey_batches_count); + + for batch in pubkey_batches { + gma_handles.push(async move { + let accounts = client.get_multiple_accounts(batch).await?; + + let results: Vec<(&Pubkey, Option)> = batch.iter().zip(accounts).collect(); + + Ok::<_, ErrorKind>(results) + }) + } + + let result = futures::future::try_join_all(gma_handles).await?; + + let trees = result + .into_iter() + .flatten() + .filter_map(|(pubkey, account)| { + account.map(|account| Self::try_from_rpc(*pubkey, account)) + }) + .collect::, _>>() + .map_err(|_| ErrorKind::SerializeTreeResponse)?; + + Ok(trees) + } +} diff --git a/bubblegum-backfill/src/worker/gap.rs b/bubblegum-backfill/src/worker/gap.rs new file mode 100644 index 000000000..07b88f4f5 --- /dev/null +++ b/bubblegum-backfill/src/worker/gap.rs @@ -0,0 +1,65 @@ +use anyhow::Result; +use clap::Parser; +use das_core::Rpc; +use futures::{stream::FuturesUnordered, StreamExt}; +use log::error; +use solana_sdk::signature::Signature; +use tokio::{ + sync::mpsc::{channel, Sender}, + task::JoinHandle, +}; + +use crate::gap::TreeGapFill; +use crate::BubblegumBackfillContext; + +#[derive(Parser, Debug, Clone)] +pub struct GapWorkerArgs { + /// The size of the signature channel. + #[arg(long, env, default_value = "1000")] + pub gap_channel_size: usize, + + /// The number of gap workers. + #[arg(long, env, default_value = "25")] + pub gap_worker_count: usize, +} + +impl GapWorkerArgs { + pub fn start( + &self, + context: BubblegumBackfillContext, + forward: Sender, + ) -> Result<(JoinHandle<()>, Sender)> { + let (gap_sender, mut gap_receiver) = channel::(self.gap_channel_size); + let gap_worker_count = self.gap_worker_count; + + let handler = tokio::spawn(async move { + let mut handlers = FuturesUnordered::new(); + let sender = forward.clone(); + + while let Some(gap) = gap_receiver.recv().await { + if handlers.len() >= gap_worker_count { + handlers.next().await; + } + + let client = context.solana_rpc.clone(); + let sender = sender.clone(); + + let handle = spawn_crawl_worker(client, sender, gap); + + handlers.push(handle); + } + + futures::future::join_all(handlers).await; + }); + + Ok((handler, gap_sender)) + } +} + +fn spawn_crawl_worker(client: Rpc, sender: Sender, gap: TreeGapFill) -> JoinHandle<()> { + tokio::spawn(async move { + if let Err(e) = gap.crawl(client, sender).await { + error!("tree transaction: {:?}", e); + } + }) +} diff --git a/bubblegum-backfill/src/worker/mod.rs b/bubblegum-backfill/src/worker/mod.rs new file mode 100644 index 000000000..0bc7f9e36 --- /dev/null +++ b/bubblegum-backfill/src/worker/mod.rs @@ -0,0 +1,9 @@ +mod gap; +mod program_transformer; +mod transaction; +mod tree; + +pub use gap::GapWorkerArgs; +pub use program_transformer::ProgramTransformerWorkerArgs; +pub use transaction::SignatureWorkerArgs; +pub use tree::TreeWorkerArgs; diff --git a/bubblegum-backfill/src/worker/program_transformer.rs b/bubblegum-backfill/src/worker/program_transformer.rs new file mode 100644 index 000000000..da90182b5 --- /dev/null +++ b/bubblegum-backfill/src/worker/program_transformer.rs @@ -0,0 +1,50 @@ +use anyhow::Result; +use clap::Parser; +use das_core::{create_download_metadata_notifier, DownloadMetadataInfo}; +use log::error; +use program_transformers::{ProgramTransformer, TransactionInfo}; +use tokio::sync::mpsc::{channel, Sender, UnboundedSender}; +use tokio::task::JoinHandle; + +use crate::BubblegumBackfillContext; + +#[derive(Parser, Debug, Clone)] +pub struct ProgramTransformerWorkerArgs { + #[arg(long, env, default_value = "100000")] + pub program_transformer_channel_size: usize, +} + +impl ProgramTransformerWorkerArgs { + pub fn start( + &self, + context: BubblegumBackfillContext, + forwarder: UnboundedSender, + ) -> Result<(JoinHandle<()>, Sender)> { + let (sender, mut receiver) = + channel::(self.program_transformer_channel_size); + + let handle = tokio::spawn(async move { + let mut transactions = Vec::new(); + let pool = context.database_pool.clone(); + + let download_metadata_notifier = create_download_metadata_notifier(forwarder).await; + + let program_transformer = + ProgramTransformer::new(pool, download_metadata_notifier, true); + + while let Some(gap) = receiver.recv().await { + transactions.push(gap); + } + + transactions.sort_by(|a, b| b.signature.cmp(&a.signature)); + + for transaction in transactions { + if let Err(e) = program_transformer.handle_transaction(&transaction).await { + error!("handle transaction: {:?}", e) + }; + } + }); + + Ok((handle, sender)) + } +} diff --git a/bubblegum-backfill/src/worker/transaction.rs b/bubblegum-backfill/src/worker/transaction.rs new file mode 100644 index 000000000..c047cffe4 --- /dev/null +++ b/bubblegum-backfill/src/worker/transaction.rs @@ -0,0 +1,210 @@ +use crate::error::ErrorKind; +use anyhow::Result; +use clap::Parser; +use das_core::Rpc; +use futures::{stream::FuturesUnordered, StreamExt}; +use log::error; +use program_transformers::TransactionInfo; +use solana_program::pubkey::Pubkey; +use solana_sdk::instruction::CompiledInstruction; +use solana_sdk::signature::Signature; +use solana_sdk::transaction::VersionedTransaction; +use solana_transaction_status::{ + option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta, + InnerInstruction, InnerInstructions, UiInstruction, +}; +use tokio::{ + sync::mpsc::{channel, Sender}, + task::JoinHandle, +}; + +pub struct PubkeyString(pub String); + +impl TryFrom for Pubkey { + type Error = ErrorKind; + + fn try_from(value: PubkeyString) -> Result { + let decoded_bytes = bs58::decode(value.0) + .into_vec() + .map_err(|e| ErrorKind::Generic(e.to_string()))?; + + Pubkey::try_from(decoded_bytes) + .map_err(|_| ErrorKind::Generic("unable to convert pubkey".to_string())) + } +} + +pub struct FetchedEncodedTransactionWithStatusMeta(pub EncodedConfirmedTransactionWithStatusMeta); + +impl TryFrom for TransactionInfo { + type Error = ErrorKind; + + fn try_from( + fetched_transaction: FetchedEncodedTransactionWithStatusMeta, + ) -> Result { + let mut account_keys = Vec::new(); + let encoded_transaction_with_status_meta = fetched_transaction.0; + + let ui_transaction: VersionedTransaction = encoded_transaction_with_status_meta + .transaction + .transaction + .decode() + .ok_or(ErrorKind::Generic( + "unable to decode transaction".to_string(), + ))?; + + let signature = ui_transaction.signatures[0]; + + let msg = ui_transaction.message; + + let meta = encoded_transaction_with_status_meta + .transaction + .meta + .ok_or(ErrorKind::Generic( + "unable to get meta from transaction".to_string(), + ))?; + + for address in msg.static_account_keys().iter().copied() { + account_keys.push(address); + } + let ui_loaded_addresses = meta.loaded_addresses; + + let message_address_table_lookup = msg.address_table_lookups(); + + if message_address_table_lookup.is_some() { + if let OptionSerializer::Some(ui_lookup_table) = ui_loaded_addresses { + for address in ui_lookup_table.writable { + account_keys.push(PubkeyString(address).try_into()?); + } + + for address in ui_lookup_table.readonly { + account_keys.push(PubkeyString(address).try_into()?); + } + } + } + + let mut meta_inner_instructions = Vec::new(); + + let compiled_instruction = msg.instructions().to_vec(); + + let mut instructions = Vec::new(); + + for inner in compiled_instruction { + instructions.push(InnerInstruction { + stack_height: Some(0), + instruction: CompiledInstruction { + program_id_index: inner.program_id_index, + accounts: inner.accounts, + data: inner.data, + }, + }); + } + + meta_inner_instructions.push(InnerInstructions { + index: 0, + instructions, + }); + + if let OptionSerializer::Some(inner_instructions) = meta.inner_instructions { + for ix in inner_instructions { + let mut instructions = Vec::new(); + + for inner in ix.instructions { + if let UiInstruction::Compiled(compiled) = inner { + instructions.push(InnerInstruction { + stack_height: compiled.stack_height, + instruction: CompiledInstruction { + program_id_index: compiled.program_id_index, + accounts: compiled.accounts, + data: bs58::decode(compiled.data) + .into_vec() + .map_err(|e| ErrorKind::Generic(e.to_string()))?, + }, + }); + } + } + + meta_inner_instructions.push(InnerInstructions { + index: ix.index, + instructions, + }); + } + } + + Ok(Self { + slot: encoded_transaction_with_status_meta.slot, + account_keys, + signature, + message_instructions: msg.instructions().to_vec(), + meta_inner_instructions, + }) + } +} + +#[derive(Parser, Clone, Debug)] +pub struct SignatureWorkerArgs { + /// The size of the signature channel. + #[arg(long, env, default_value = "100000")] + pub signature_channel_size: usize, + /// The number of transaction workers. + #[arg(long, env, default_value = "50")] + pub signature_worker_count: usize, +} + +impl SignatureWorkerArgs { + pub fn start( + &self, + context: crate::BubblegumBackfillContext, + forwarder: Sender, + ) -> Result<(JoinHandle<()>, Sender)> { + let (sig_sender, mut sig_receiver) = channel::(self.signature_channel_size); + let worker_count = self.signature_worker_count; + + let handle = tokio::spawn(async move { + let mut handlers = FuturesUnordered::new(); + + while let Some(signature) = sig_receiver.recv().await { + if handlers.len() >= worker_count { + handlers.next().await; + } + + let solana_rpc = context.solana_rpc.clone(); + let transaction_sender = forwarder.clone(); + + let handle = spawn_transaction_worker(solana_rpc, transaction_sender, signature); + + handlers.push(handle); + } + + futures::future::join_all(handlers).await; + }); + + Ok((handle, sig_sender)) + } +} + +async fn queue_transaction<'a>( + client: Rpc, + sender: Sender, + signature: Signature, +) -> Result<(), ErrorKind> { + let transaction = client.get_transaction(&signature).await?; + + sender + .send(FetchedEncodedTransactionWithStatusMeta(transaction).try_into()?) + .await + .map_err(|e| ErrorKind::Generic(e.to_string()))?; + + Ok(()) +} + +fn spawn_transaction_worker( + client: Rpc, + sender: Sender, + signature: Signature, +) -> JoinHandle<()> { + tokio::spawn(async move { + if let Err(e) = queue_transaction(client, sender, signature).await { + error!("queue transaction: {:?}", e); + } + }) +} diff --git a/bubblegum-backfill/src/worker/tree.rs b/bubblegum-backfill/src/worker/tree.rs new file mode 100644 index 000000000..1d3e9b82b --- /dev/null +++ b/bubblegum-backfill/src/worker/tree.rs @@ -0,0 +1,114 @@ +use crate::{ + gap::{TreeGapFill, TreeGapModel}, + tree::TreeResponse, + BubblegumBackfillContext, +}; +use anyhow::Result; +use clap::Parser; +use das_core::MetadataJsonDownloadWorkerArgs; +use digital_asset_types::dao::cl_audits_v2; +use log::error; +use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder, SqlxPostgresConnector}; +use solana_sdk::signature::Signature; +use tokio::task::JoinHandle; + +use super::{GapWorkerArgs, ProgramTransformerWorkerArgs, SignatureWorkerArgs}; + +#[derive(Debug, Clone, Parser)] +pub struct TreeWorkerArgs { + #[clap(flatten)] + pub metadata_json_download_worker: MetadataJsonDownloadWorkerArgs, + + #[clap(flatten)] + pub signature_worker: SignatureWorkerArgs, + + #[clap(flatten)] + pub gap_worker: GapWorkerArgs, + + #[clap(flatten)] + pub program_transformer_worker: ProgramTransformerWorkerArgs, +} +impl TreeWorkerArgs { + pub fn start( + &self, + context: BubblegumBackfillContext, + tree: TreeResponse, + ) -> JoinHandle> { + let db_pool = context.database_pool.clone(); + let metadata_json_download_db_pool = context.database_pool.clone(); + + let program_transformer_context = context.clone(); + let signature_context = context.clone(); + + let metadata_json_download_worker_args = self.metadata_json_download_worker.clone(); + let program_transformer_worker_args = self.program_transformer_worker.clone(); + let signature_worker_args = self.signature_worker.clone(); + let gap_worker_args = self.gap_worker.clone(); + + tokio::spawn(async move { + let (metadata_json_download_worker, metadata_json_download_sender) = + metadata_json_download_worker_args.start(metadata_json_download_db_pool)?; + + let (program_transformer_worker, transaction_info_sender) = + program_transformer_worker_args + .start(program_transformer_context, metadata_json_download_sender)?; + + let (signature_worker, signature_sender) = + signature_worker_args.start(signature_context, transaction_info_sender)?; + + let (gap_worker, tree_gap_sender) = gap_worker_args.start(context, signature_sender)?; + + { + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(db_pool); + + let mut gaps = TreeGapModel::find(&conn, tree.pubkey) + .await? + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?; + + let upper_known_seq = cl_audits_v2::Entity::find() + .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) + .order_by_desc(cl_audits_v2::Column::Seq) + .one(&conn) + .await?; + + let lower_known_seq = cl_audits_v2::Entity::find() + .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) + .order_by_asc(cl_audits_v2::Column::Seq) + .one(&conn) + .await?; + + if let Some(upper_seq) = upper_known_seq { + let signature = Signature::try_from(upper_seq.tx.as_ref())?; + + gaps.push(TreeGapFill::new(tree.pubkey, None, Some(signature))); + } else if tree.seq > 0 { + gaps.push(TreeGapFill::new(tree.pubkey, None, None)); + } + + if let Some(lower_seq) = lower_known_seq.filter(|seq| seq.seq > 1) { + let signature = Signature::try_from(lower_seq.tx.as_ref())?; + + gaps.push(TreeGapFill::new(tree.pubkey, Some(signature), None)); + } + + for gap in gaps { + if let Err(e) = tree_gap_sender.send(gap).await { + error!("send gap: {:?}", e); + } + } + } + + futures::future::try_join4( + gap_worker, + signature_worker, + program_transformer_worker, + metadata_json_download_worker, + ) + .await?; + + Ok(()) + }) + } +} diff --git a/core/Cargo.toml b/core/Cargo.toml index d0532684f..9ee91c67f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -8,18 +8,34 @@ publish.workspace = true [dependencies] anyhow = { workspace = true } backon = { workspace = true } +borsh = { workspace = true } +bs58 = { workspace = true } +cadence = { workspace = true } +cadence-macros = { workspace = true } +clap = { workspace = true, features = ["derive", "cargo", "env"] } +derive_more = { workspace = true } +digital_asset_types = { workspace = true } +figment = { workspace = true } +futures = { workspace = true } +indicatif = { workspace = true } +log = { workspace = true } +plerkle_messenger = { workspace = true } +reqwest = { workspace = true } +sea-orm = { workspace = true, features = [ + "sqlx-postgres", + "with-chrono", + "runtime-tokio-rustls", +] } +serde_json = { workspace = true } +spl-account-compression = { workspace = true } solana-account-decoder = { workspace = true } solana-client = { workspace = true } solana-sdk = { workspace = true } solana-transaction-status = { workspace = true } -cadence = { workspace = true } -cadence-macros = { workspace = true } +sqlx = { workspace = true, fatures = ["runtime-tokio-rustls", "postgres"] } thiserror = { workspace = true } -figment = { workspace = true } -plerkle_messenger = { workspace = true } tokio = { workspace = true } -clap = { workspace = true, features = ["derive", "cargo", "env"] } -sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] } +url = { workspace = true } [lints] workspace = true diff --git a/core/src/lib.rs b/core/src/lib.rs index da6bb050e..341c54817 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,9 +1,11 @@ mod db; +mod metadata_json; mod metrics; mod plerkle_messenger_queue; mod solana_rpc; pub use db::*; +pub use metadata_json::*; pub use metrics::*; pub use plerkle_messenger_queue::*; pub use solana_rpc::*; diff --git a/core/src/metadata_json.rs b/core/src/metadata_json.rs new file mode 100644 index 000000000..55c0dc64c --- /dev/null +++ b/core/src/metadata_json.rs @@ -0,0 +1,223 @@ +use { + backon::{ExponentialBuilder, Retryable}, + clap::Parser, + digital_asset_types::dao::asset_data, + futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}, + indicatif::HumanDuration, + log::{debug, error}, + reqwest::{Client, Url as ReqwestUrl}, + sea_orm::{entity::*, SqlxPostgresConnector}, + tokio::{ + sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender}, + task::JoinHandle, + time::Instant, + }, +}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DownloadMetadataInfo { + asset_data_id: Vec, + uri: String, +} + +impl DownloadMetadataInfo { + pub fn new(asset_data_id: Vec, uri: String) -> Self { + Self { + asset_data_id, + uri: uri.trim().replace('\0', ""), + } + } + + pub fn into_inner(self) -> (Vec, String) { + (self.asset_data_id, self.uri) + } +} + +pub type DownloadMetadataNotifier = Box< + dyn Fn( + DownloadMetadataInfo, + ) -> BoxFuture<'static, Result<(), Box>> + + Sync + + Send, +>; + +pub async fn create_download_metadata_notifier( + download_metadata_json_sender: UnboundedSender, +) -> DownloadMetadataNotifier { + Box::new(move |info: DownloadMetadataInfo| -> BoxFuture<'static, Result<(), Box>> + { + let task = download_metadata_json_sender.send(info).map_err(Into::into); + + Box::pin(async move { task }) + }) +} + +#[derive(Parser, Clone, Debug)] +pub struct MetadataJsonDownloadWorkerArgs { + /// The number of worker threads + #[arg(long, env, default_value = "25")] + metadata_json_download_worker_count: usize, + /// The request timeout in milliseconds + #[arg(long, env, default_value = "1000")] + metadata_json_download_worker_request_timeout: u64, +} + +impl MetadataJsonDownloadWorkerArgs { + pub fn start( + &self, + pool: sqlx::PgPool, + ) -> Result< + (JoinHandle<()>, UnboundedSender), + MetadataJsonDownloadWorkerError, + > { + let (sender, mut rx) = unbounded_channel::(); + let worker_count = self.metadata_json_download_worker_count; + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_millis( + self.metadata_json_download_worker_request_timeout, + )) + .build()?; + + let handle = tokio::spawn(async move { + let mut handlers = FuturesUnordered::new(); + + while let Some(download_metadata_info) = rx.recv().await { + if handlers.len() >= worker_count { + handlers.next().await; + } + + let pool = pool.clone(); + let client = client.clone(); + + handlers.push(spawn_task(client, pool, download_metadata_info)); + } + + while handlers.next().await.is_some() {} + }); + + Ok((handle, sender)) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum MetadataJsonDownloadWorkerError { + #[error("send error: {0}")] + Send(#[from] SendError), + #[error("join error: {0}")] + Join(#[from] tokio::task::JoinError), + #[error("reqwest: {0}")] + Reqwest(#[from] reqwest::Error), +} + +fn spawn_task( + client: Client, + pool: sqlx::PgPool, + download_metadata_info: DownloadMetadataInfo, +) -> JoinHandle<()> { + tokio::spawn(async move { + let timing = Instant::now(); + let asset_data_id = + bs58::encode(download_metadata_info.asset_data_id.clone()).into_string(); + + if let Err(e) = perform_metadata_json_task(client, pool, download_metadata_info).await { + error!("Asset {} failed: {}", asset_data_id, e); + } + + debug!( + "Asset {} finished in {}", + asset_data_id, + HumanDuration(timing.elapsed()) + ); + }) +} + +#[derive(thiserror::Error, Debug)] +pub enum FetchMetadataJsonError { + #[error("reqwest: {0}")] + GenericReqwest(#[from] reqwest::Error), + #[error("json parse for url({url}) with {source}")] + Parse { + source: reqwest::Error, + url: ReqwestUrl, + }, + #[error("response {status} for url ({url}) with {source}")] + Response { + source: reqwest::Error, + url: ReqwestUrl, + status: StatusCode, + }, + #[error("url parse: {0}")] + Url(#[from] url::ParseError), +} + +#[derive(Debug, derive_more::Display)] +pub enum StatusCode { + Unknown, + Code(reqwest::StatusCode), +} + +async fn fetch_metadata_json( + client: Client, + metadata_json_url: &str, +) -> Result { + (|| async { + let url = ReqwestUrl::parse(metadata_json_url)?; + + let response = client.get(url.clone()).send().await?; + + match response.error_for_status() { + Ok(res) => res + .json::() + .await + .map_err(|source| FetchMetadataJsonError::Parse { source, url }), + Err(source) => { + let status = source + .status() + .map(StatusCode::Code) + .unwrap_or(StatusCode::Unknown); + + Err(FetchMetadataJsonError::Response { + source, + url, + status, + }) + } + } + }) + .retry(&ExponentialBuilder::default()) + .await +} + +#[derive(thiserror::Error, Debug)] +pub enum MetadataJsonTaskError { + #[error("sea orm: {0}")] + SeaOrm(#[from] sea_orm::DbErr), + #[error("metadata json: {0}")] + Fetch(#[from] FetchMetadataJsonError), + #[error("asset not found in the db")] + AssetNotFound, +} + +pub async fn perform_metadata_json_task( + client: Client, + pool: sqlx::PgPool, + download_metadata_info: DownloadMetadataInfo, +) -> Result { + match fetch_metadata_json(client, &download_metadata_info.uri).await { + Ok(metadata) => { + let active_model = asset_data::ActiveModel { + id: Set(download_metadata_info.asset_data_id), + metadata: Set(metadata), + reindex: Set(Some(false)), + ..Default::default() + }; + + let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + + let model = active_model.update(&conn).await?; + + Ok(model) + } + Err(e) => Err(MetadataJsonTaskError::Fetch(e)), + } +} diff --git a/nft_ingester/Cargo.toml b/nft_ingester/Cargo.toml index fd17a2bed..0b8494cd1 100644 --- a/nft_ingester/Cargo.toml +++ b/nft_ingester/Cargo.toml @@ -14,6 +14,7 @@ cadence = { workspace = true } cadence-macros = { workspace = true } chrono = { workspace = true } clap = { workspace = true, features = ["derive", "cargo"] } +das-core = { workspace = true } digital_asset_types = { workspace = true, features = [ "json_types", "sql_types", diff --git a/nft_ingester/src/tasks/common/mod.rs b/nft_ingester/src/tasks/common/mod.rs index 17ec935a0..a74c650be 100644 --- a/nft_ingester/src/tasks/common/mod.rs +++ b/nft_ingester/src/tasks/common/mod.rs @@ -2,10 +2,10 @@ use { super::{BgTask, FromTaskData, IngesterError, IntoTaskData, TaskData}, async_trait::async_trait, chrono::{NaiveDateTime, Utc}, + das_core::{DownloadMetadataInfo, DownloadMetadataNotifier}, digital_asset_types::dao::asset_data, futures::future::BoxFuture, log::debug, - program_transformers::{DownloadMetadataInfo, DownloadMetadataNotifier}, reqwest::{Client, ClientBuilder}, sea_orm::*, serde::{Deserialize, Serialize}, diff --git a/ops/Cargo.toml b/ops/Cargo.toml index 48c0750e1..87e55f01f 100644 --- a/ops/Cargo.toml +++ b/ops/Cargo.toml @@ -11,12 +11,12 @@ name = "das-ops" [dependencies] anchor-client = { workspace = true } anyhow = { workspace = true } -backon = { workspace = true } borsh = { workspace = true } bs58 = { workspace = true } cadence = { workspace = true } cadence-macros = { workspace = true } clap = { workspace = true, features = ["derive", "cargo", "env"] } +das-bubblegum-backfill = { workspace = true } das-core = { workspace = true } digital_asset_types = { workspace = true } env_logger = { workspace = true } diff --git a/ops/src/account/account_info.rs b/ops/src/account/account_info.rs index b6216b331..6ce48a544 100644 --- a/ops/src/account/account_info.rs +++ b/ops/src/account/account_info.rs @@ -17,7 +17,7 @@ pub async fn fetch(rpc: &Rpc, pubkey: Pubkey) -> Result --messenger-redis-url --solana-rpc-url - -Options: - --tree-crawler-count - Number of tree crawler workers [env: TREE_CRAWLER_COUNT=] [default: 20] - --signature-channel-size - The size of the signature channel [env: SIGNATURE_CHANNEL_SIZE=] [default: 10000] - --gap-channel-size - The size of the signature channel [env: GAP_CHANNEL_SIZE=] [default: 1000] - --transaction-worker-count - The number of transaction workers [env: TRANSACTION_WORKER_COUNT=] [default: 100] - --gap-worker-count - The number of gap workers [env: GAP_WORKER_COUNT=] [default: 25] - --only-trees - The list of trees to crawl. If not specified, all trees will be crawled [env: ONLY_TREES=] - --database-url - The database URL [env: DATABASE_URL=] - --database-max-connections - The maximum number of connections to the database [env: DATABASE_MAX_CONNECTIONS=] [default: 125] - --database-min-connections - The minimum number of connections to the database [env: DATABASE_MIN_CONNECTIONS=] [default: 5] - --messenger-redis-url - [env: MESSENGER_REDIS_URL=] - --messenger-redis-batch-size - [env: MESSENGER_REDIS_BATCH_SIZE=] [default: 100] - --messenger-queue-connections - [env: MESSENGER_QUEUE_CONNECTIONS=] [default: 25] - --messenger-queue-stream - [env: MESSENGER_QUEUE_STREAM=] [default: TXNFILL] - --metrics-host - [env: METRICS_HOST=] [default: 127.0.0.1] - --metrics-port - [env: METRICS_PORT=] [default: 8125] - --metrics-prefix - [env: METRICS_PREFIX=] [default: das.backfiller] - --solana-rpc-url - [env: SOLANA_RPC_URL=] - -h, --help - Print help -``` - -### Metrics - -The bubblegum command provides several metrics for monitoring performance and status: - -Metric | Description ---- | --- -transaction.failed | Count of failed transaction -transaction.succeeded | Count of successfully queued transaction -transaction.queued | Time for a transaction to be queued -gap.failed | Count of failed gap crawling -gap.succeeded | Count of successfully crawled gaps -gap.queued | Time for a gap to be queued -tree.succeeded | Count of completed tree crawl -tree.crawled | Time to crawl a tree -job.completed | Time to complete the job +**warning**: The command expects full archive access to transactions. Before proceeding ensure your RPC is able to serve complete transaction history for Solana. \ No newline at end of file diff --git a/ops/src/bubblegum/audit.rs b/ops/src/bubblegum/audit.rs deleted file mode 100644 index deee3c4dd..000000000 --- a/ops/src/bubblegum/audit.rs +++ /dev/null @@ -1,89 +0,0 @@ -use anyhow::Result; -use clap::Parser; -use das_core::{connect_db, MetricsArgs, PoolArgs, Rpc, SolanaRpcArgs}; -use digital_asset_types::dao::cl_audits_v2; -use futures::future; -use sea_orm::{CursorTrait, EntityTrait, SqlxPostgresConnector}; -use solana_sdk::signature::Signature; -use solana_transaction_status::EncodedConfirmedTransactionWithStatusMeta; - -use tokio::io::{stdout, AsyncWriteExt}; - -#[derive(Debug, Parser, Clone)] -pub struct Args { - /// Database configuration - #[clap(flatten)] - pub database: PoolArgs, - - /// Metrics configuration - #[clap(flatten)] - pub metrics: MetricsArgs, - - /// Solana configuration - #[clap(flatten)] - pub solana: SolanaRpcArgs, - - #[arg(long, env, default_value = "10000")] - pub batch_size: u64, -} - -/// The `audit` commands checks `cl_audits_v2` for any failed transactions and logs them to stdout. -pub async fn run(config: Args) -> Result<()> { - let pool = connect_db(&config.database).await?; - - let solana_rpc = Rpc::from_config(&config.solana); - - let mut output = stdout(); - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); - let mut after = None; - - loop { - let mut query = cl_audits_v2::Entity::find().cursor_by(cl_audits_v2::Column::Id); - let mut query = query.first(config.batch_size); - - if let Some(after) = after { - query = query.after(after); - } - - let entries = query.all(&conn).await?; - - let mut transactions = vec![]; - - for entry in entries.clone() { - transactions.push(fetch_transaction(entry, solana_rpc.clone())); - } - - let transactions = future::join_all(transactions).await; - - for (signature, transaction) in transactions.into_iter().flatten() { - if let Some(meta) = transaction.transaction.meta { - if meta.err.is_some() { - output - .write_all(format!("{}\n", signature).as_bytes()) - .await?; - - output.flush().await?; - } - } - } - - after = entries.last().map(|cl_audit_v2| cl_audit_v2.id); - - if entries.is_empty() { - break; - } - } - - Ok(()) -} - -async fn fetch_transaction( - entry: cl_audits_v2::Model, - solana_rpc: Rpc, -) -> Result<(Signature, EncodedConfirmedTransactionWithStatusMeta)> { - let signature = Signature::try_from(entry.tx.as_ref())?; - - let transaction = solana_rpc.get_transaction(&signature).await?; - - Ok((signature, transaction)) -} diff --git a/ops/src/bubblegum/backfiller.rs b/ops/src/bubblegum/backfiller.rs index fbfb3bf7c..1c6c58122 100644 --- a/ops/src/bubblegum/backfiller.rs +++ b/ops/src/bubblegum/backfiller.rs @@ -1,64 +1,20 @@ -use super::{ - tree::{TreeGapFill, TreeGapModel, TreeResponse}, - BubblegumOpsErrorKind, -}; use anyhow::Result; -use cadence_macros::{statsd_count, statsd_time}; use clap::Parser; -use das_core::{connect_db, setup_metrics, MetricsArgs, PoolArgs, Rpc, SolanaRpcArgs}; -use digital_asset_types::dao::cl_audits_v2; -use futures::future::{ready, FutureExt}; -use futures::{stream::FuturesUnordered, StreamExt}; -use indicatif::HumanDuration; -use log::{debug, error}; -use program_transformers::{ProgramTransformer, TransactionInfo}; -use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder, SqlxPostgresConnector}; -use solana_program::pubkey::Pubkey; -use solana_sdk::instruction::CompiledInstruction; -use solana_sdk::signature::Signature; -use solana_sdk::transaction::VersionedTransaction; -use solana_transaction_status::{ - option_serializer::OptionSerializer, EncodedConfirmedTransactionWithStatusMeta, - InnerInstruction, InnerInstructions, UiInstruction, +use das_bubblegum_backfill::{ + start_bubblegum_backfill, BubblegumBackfillArgs, BubblegumBackfillContext, }; -use sqlx::PgPool; -use std::time::Instant; -use tokio::{sync::mpsc, task::JoinHandle}; +use das_core::{connect_db, PoolArgs, Rpc, SolanaRpcArgs}; #[derive(Debug, Parser, Clone)] pub struct Args { - /// Number of tree crawler workers - #[arg(long, env, default_value = "20")] - pub tree_crawler_count: usize, - - /// The size of the signature channel. - #[arg(long, env, default_value = "100000")] - pub signature_channel_size: usize, - - /// The size of the signature channel. - #[arg(long, env, default_value = "1000")] - pub gap_channel_size: usize, - - /// The number of transaction workers. - #[arg(long, env, default_value = "50")] - pub transaction_worker_count_per_tree: usize, - - /// The number of gap workers. - #[arg(long, env, default_value = "25")] - pub gap_worker_count: usize, - - /// The list of trees to crawl. If not specified, all trees will be crawled. - #[arg(long, env, use_value_delimiter = true)] - pub only_trees: Option>, + /// Backfill Bubblegum Args + #[clap(flatten)] + pub backfill_bubblegum: BubblegumBackfillArgs, /// Database configuration #[clap(flatten)] pub database: PoolArgs, - /// Metrics configuration - #[clap(flatten)] - pub metrics: MetricsArgs, - /// Solana configuration #[clap(flatten)] pub solana: SolanaRpcArgs, @@ -66,17 +22,14 @@ pub struct Args { /// Executes the backfilling operation for the tree crawler. /// -/// This function sets up the essential components required for the backfilling operation, -/// including database connections, RPC clients, and worker managers to handle -/// transactions and gaps. It retrieves the necessary trees for crawling and orchestrates -/// the crawling operation across various workers. +/// This function initializes the necessary components for the backfilling operation, +/// such as database connections and RPC clients, and then delegates the actual +/// backfilling logic to the `das_bubblegum_backfill` crate. /// /// The function undertakes the following key tasks: /// - Establishes database connections and initializes RPC clients. -/// - Configures channels for inter-component communication. -/// - Deploys worker managers to handle transactions and gaps. -/// - Retrieves trees from the database and oversees their crawling. -/// - Monitors metrics and logs activities throughout the operation. +/// - Creates a context for the backfilling operation. +/// - Invokes the `start_bubblegum_backfill` function from the `das_bubblegum_backfill` crate. /// /// # Arguments /// @@ -91,378 +44,12 @@ pub struct Args { /// /// # Errors /// -/// Potential errors can arise from database connectivity issues, RPC failures, -/// or complications in spawning and managing worker tasks. +/// Potential errors can arise from database connectivity issues or RPC failures. pub async fn run(config: Args) -> Result<()> { - let pool = connect_db(&config.database).await?; + let database_pool = connect_db(&config.database).await?; let solana_rpc = Rpc::from_config(&config.solana); + let context = BubblegumBackfillContext::new(database_pool, solana_rpc); - setup_metrics(&config.metrics)?; - - let started = Instant::now(); - - let trees = if let Some(ref only_trees) = config.only_trees { - TreeResponse::find(&solana_rpc, only_trees.clone()).await? - } else { - TreeResponse::all(&solana_rpc).await? - }; - - let tree_count = trees.len(); - - debug!( - "fetched {} trees in {}", - tree_count, - HumanDuration(started.elapsed()) - ); - - let tree_crawler_count = config.tree_crawler_count; - let mut crawl_handles = FuturesUnordered::new(); - - for tree in trees { - if crawl_handles.len() >= tree_crawler_count { - crawl_handles.next().await; - } - - let pool = pool.clone(); - let solana_rpc = solana_rpc.clone(); - - let handle = spawn_tree_worker(&config, pool, solana_rpc, tree); - - crawl_handles.push(handle); - } - - futures::future::try_join_all(crawl_handles).await?; - - statsd_time!("job.completed", started.elapsed()); - - debug!( - "crawled {} trees in {}", - tree_count, - HumanDuration(started.elapsed()) - ); - - Ok(()) -} - -fn spawn_tree_worker( - config: &Args, - pool: PgPool, - rpc: Rpc, - tree: TreeResponse, -) -> JoinHandle> { - let config = config.clone(); - let gap_solana_rpc = rpc.clone(); - let gap_pool = pool.clone(); - - tokio::spawn(async move { - let timing = Instant::now(); - - let transaction_worker_count = config.transaction_worker_count_per_tree; - - let (sig_sender, mut sig_receiver) = - mpsc::channel::(config.signature_channel_size); - let gap_sig_sender = sig_sender.clone(); - - let (gap_sender, mut gap_receiver) = mpsc::channel::(config.gap_channel_size); - let (transaction_sender, mut transaction_receiver) = - mpsc::channel::(config.signature_channel_size); - - let signature_worker_manager = tokio::spawn(async move { - let mut handlers = FuturesUnordered::new(); - - while let Some(signature) = sig_receiver.recv().await { - if handlers.len() >= transaction_worker_count { - handlers.next().await; - } - - let solana_rpc = rpc.clone(); - let transaction_sender = transaction_sender.clone(); - - let handle = spawn_transaction_worker(solana_rpc, transaction_sender, signature); - - handlers.push(handle); - } - - futures::future::join_all(handlers).await; - - drop(transaction_sender); - }); - - let gap_worker_count = config.gap_worker_count; - - let gap_worker_manager = tokio::spawn(async move { - let mut handlers = FuturesUnordered::new(); - let sender = gap_sig_sender.clone(); - - while let Some(gap) = gap_receiver.recv().await { - if handlers.len() >= gap_worker_count { - handlers.next().await; - } - - let client = gap_solana_rpc.clone(); - let sender = sender.clone(); - - let handle = spawn_crawl_worker(client, sender, gap); - - handlers.push(handle); - } - - futures::future::join_all(handlers).await; - - drop(sig_sender); - }); - - let transaction_worker_manager = tokio::spawn(async move { - let mut transactions = Vec::new(); - let pool = pool.clone(); - - let program_transformer = - ProgramTransformer::new(pool, Box::new(|_info| ready(Ok(())).boxed()), true); - - while let Some(gap) = transaction_receiver.recv().await { - transactions.push(gap); - } - - transactions.sort_by(|a, b| b.signature.cmp(&a.signature)); - - for transaction in transactions { - if let Err(e) = program_transformer.handle_transaction(&transaction).await { - error!("handle transaction: {:?}", e) - }; - } - }); - - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(gap_pool); - - let mut gaps = TreeGapModel::find(&conn, tree.pubkey) - .await? - .into_iter() - .map(TryInto::try_into) - .collect::, _>>()?; - - let upper_known_seq = cl_audits_v2::Entity::find() - .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) - .order_by_desc(cl_audits_v2::Column::Seq) - .one(&conn) - .await?; - - let lower_known_seq = cl_audits_v2::Entity::find() - .filter(cl_audits_v2::Column::Tree.eq(tree.pubkey.as_ref().to_vec())) - .order_by_asc(cl_audits_v2::Column::Seq) - .one(&conn) - .await?; - - drop(conn); - - if let Some(upper_seq) = upper_known_seq { - let signature = Signature::try_from(upper_seq.tx.as_ref())?; - - gaps.push(TreeGapFill::new(tree.pubkey, None, Some(signature))); - } else if tree.seq > 0 { - gaps.push(TreeGapFill::new(tree.pubkey, None, None)); - } - - if let Some(lower_seq) = lower_known_seq.filter(|seq| seq.seq > 1) { - let signature = Signature::try_from(lower_seq.tx.as_ref())?; - - gaps.push(TreeGapFill::new(tree.pubkey, Some(signature), None)); - } - - for gap in gaps { - if let Err(e) = gap_sender.send(gap).await { - statsd_count!("gap.failed", 1); - error!("send gap: {:?}", e); - } - } - - drop(gap_sender); - gap_worker_manager.await?; - - signature_worker_manager.await?; - - transaction_worker_manager.await?; - - statsd_count!("tree.succeeded", 1); - statsd_time!("tree.crawled", timing.elapsed()); - - Ok::<(), anyhow::Error>(()) - }) -} - -fn spawn_crawl_worker( - client: Rpc, - sender: mpsc::Sender, - gap: TreeGapFill, -) -> JoinHandle<()> { - tokio::spawn(async move { - let timing = Instant::now(); - - if let Err(e) = gap.crawl(client, sender).await { - error!("tree transaction: {:?}", e); - - statsd_count!("gap.failed", 1); - } else { - statsd_count!("gap.succeeded", 1); - } - - statsd_time!("gap.queued", timing.elapsed()); - }) -} - -pub struct FetchedEncodedTransactionWithStatusMeta(pub EncodedConfirmedTransactionWithStatusMeta); - -impl TryFrom for TransactionInfo { - type Error = BubblegumOpsErrorKind; - - fn try_from( - fetched_transaction: FetchedEncodedTransactionWithStatusMeta, - ) -> Result { - let mut account_keys = Vec::new(); - let encoded_transaction_with_status_meta = fetched_transaction.0; - - let ui_transaction: VersionedTransaction = encoded_transaction_with_status_meta - .transaction - .transaction - .decode() - .ok_or(BubblegumOpsErrorKind::Generic( - "unable to decode transaction".to_string(), - ))?; - - let signature = ui_transaction.signatures[0]; - - let msg = ui_transaction.message; - - let meta = encoded_transaction_with_status_meta - .transaction - .meta - .ok_or(BubblegumOpsErrorKind::Generic( - "unable to get meta from transaction".to_string(), - ))?; - - for address in msg.static_account_keys().iter().copied() { - account_keys.push(address); - } - let ui_loaded_addresses = meta.loaded_addresses; - - let message_address_table_lookup = msg.address_table_lookups(); - - if message_address_table_lookup.is_some() { - if let OptionSerializer::Some(ui_lookup_table) = ui_loaded_addresses { - for address in ui_lookup_table.writable { - account_keys.push(PubkeyString(address).try_into()?); - } - - for address in ui_lookup_table.readonly { - account_keys.push(PubkeyString(address).try_into()?); - } - } - } - - let mut meta_inner_instructions = Vec::new(); - - let compiled_instruction = msg.instructions().to_vec(); - - let mut instructions = Vec::new(); - - for inner in compiled_instruction { - instructions.push(InnerInstruction { - stack_height: Some(0), - instruction: CompiledInstruction { - program_id_index: inner.program_id_index, - accounts: inner.accounts, - data: inner.data, - }, - }); - } - - meta_inner_instructions.push(InnerInstructions { - index: 0, - instructions, - }); - - if let OptionSerializer::Some(inner_instructions) = meta.inner_instructions { - for ix in inner_instructions { - let mut instructions = Vec::new(); - - for inner in ix.instructions { - if let UiInstruction::Compiled(compiled) = inner { - instructions.push(InnerInstruction { - stack_height: compiled.stack_height, - instruction: CompiledInstruction { - program_id_index: compiled.program_id_index, - accounts: compiled.accounts, - data: bs58::decode(compiled.data) - .into_vec() - .map_err(|e| BubblegumOpsErrorKind::Generic(e.to_string()))?, - }, - }); - } - } - - meta_inner_instructions.push(InnerInstructions { - index: ix.index, - instructions, - }); - } - } - - Ok(Self { - slot: encoded_transaction_with_status_meta.slot, - account_keys, - signature, - message_instructions: msg.instructions().to_vec(), - meta_inner_instructions, - }) - } -} - -async fn queue_transaction<'a>( - client: Rpc, - sender: mpsc::Sender, - signature: Signature, -) -> Result<(), BubblegumOpsErrorKind> { - let transaction = client.get_transaction(&signature).await?; - - sender - .send(FetchedEncodedTransactionWithStatusMeta(transaction).try_into()?) - .await - .map_err(|e| BubblegumOpsErrorKind::Generic(e.to_string()))?; - - Ok(()) -} - -fn spawn_transaction_worker( - client: Rpc, - sender: mpsc::Sender, - signature: Signature, -) -> JoinHandle<()> { - tokio::spawn(async move { - let timing = Instant::now(); - - if let Err(e) = queue_transaction(client, sender, signature).await { - error!("queue transaction: {:?}", e); - - statsd_count!("transaction.failed", 1); - } else { - statsd_count!("transaction.succeeded", 1); - } - - statsd_time!("transaction.queued", timing.elapsed()); - }) -} - -pub struct PubkeyString(pub String); - -impl TryFrom for Pubkey { - type Error = BubblegumOpsErrorKind; - - fn try_from(value: PubkeyString) -> Result { - let decoded_bytes = bs58::decode(value.0) - .into_vec() - .map_err(|e| BubblegumOpsErrorKind::Generic(e.to_string()))?; - - Pubkey::try_from(decoded_bytes) - .map_err(|_| BubblegumOpsErrorKind::Generic("unable to convert pubkey".to_string())) - } + start_bubblegum_backfill(context, config.backfill_bubblegum).await } diff --git a/ops/src/bubblegum/cmd.rs b/ops/src/bubblegum/cmd.rs index b19715483..1004a227d 100644 --- a/ops/src/bubblegum/cmd.rs +++ b/ops/src/bubblegum/cmd.rs @@ -1,27 +1,6 @@ -use super::{audit, backfiller}; +use super::backfiller; use anyhow::Result; use clap::{Args, Subcommand}; -use thiserror::Error as ThisError; - -#[derive(ThisError, Debug)] -pub enum BubblegumOpsErrorKind { - #[error("anchor")] - Anchor(#[from] anchor_client::anchor_lang::error::Error), - #[error("solana rpc")] - Rpc(#[from] solana_client::client_error::ClientError), - #[error("parse pubkey")] - ParsePubkey(#[from] solana_sdk::pubkey::ParsePubkeyError), - #[error("serialize tree response")] - SerializeTreeResponse, - #[error("sea orm")] - Database(#[from] sea_orm::DbErr), - #[error("try from pubkey")] - TryFromPubkey, - #[error("try from signature")] - TryFromSignature, - #[error("generic error: {0}")] - Generic(String), -} #[derive(Debug, Clone, Subcommand)] pub enum Commands { @@ -29,9 +8,6 @@ pub enum Commands { /// It crawls through trees and backfills any missed tree transactions. #[clap(name = "backfill")] Backfill(backfiller::Args), - /// The `audit` commands checks `cl_audits_v2` for any failed transactions and logs them to stdout. - #[clap(name = "audit")] - Audit(audit::Args), } #[derive(Debug, Clone, Args)] @@ -45,9 +21,6 @@ pub async fn subcommand(subcommand: BubblegumCommand) -> Result<()> { Commands::Backfill(args) => { backfiller::run(args).await?; } - Commands::Audit(args) => { - audit::run(args).await?; - } } Ok(()) diff --git a/ops/src/bubblegum/mod.rs b/ops/src/bubblegum/mod.rs index eb4c867ad..d798b6e9e 100644 --- a/ops/src/bubblegum/mod.rs +++ b/ops/src/bubblegum/mod.rs @@ -1,6 +1,4 @@ -mod audit; mod backfiller; mod cmd; -mod tree; pub use cmd::*; diff --git a/ops/src/bubblegum/tree.rs b/ops/src/bubblegum/tree.rs deleted file mode 100644 index 14685ba40..000000000 --- a/ops/src/bubblegum/tree.rs +++ /dev/null @@ -1,260 +0,0 @@ -use super::BubblegumOpsErrorKind; -use anyhow::Result; -use borsh::BorshDeserialize; -use clap::Args; -use das_core::Rpc; -use sea_orm::{DatabaseConnection, DbBackend, FromQueryResult, Statement, Value}; -use solana_client::rpc_filter::{Memcmp, RpcFilterType}; -use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature; -use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature}; -use spl_account_compression::id; -use spl_account_compression::state::{ - merkle_tree_get_size, ConcurrentMerkleTreeHeader, CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1, -}; -use std::str::FromStr; -use tokio::sync::mpsc::Sender; - -const GET_SIGNATURES_FOR_ADDRESS_LIMIT: usize = 1000; - -#[derive(Debug, Clone, Args)] -pub struct ConfigBackfiller { - /// Solana RPC URL - #[arg(long, env)] - pub solana_rpc_url: String, -} - -const TREE_GAP_SQL: &str = r#" -WITH sequenced_data AS ( - SELECT - tree, - seq, - LEAD(seq) OVER (ORDER BY seq ASC) AS next_seq, - tx AS current_tx, - LEAD(tx) OVER (ORDER BY seq ASC) AS next_tx - FROM - cl_audits_v2 - WHERE - tree = $1 -), -gaps AS ( - SELECT - tree, - seq AS gap_start_seq, - next_seq AS gap_end_seq, - current_tx AS lower_bound_tx, - next_tx AS upper_bound_tx - FROM - sequenced_data - WHERE - next_seq IS NOT NULL AND - next_seq - seq > 1 -) -SELECT - tree, - gap_start_seq, - gap_end_seq, - lower_bound_tx, - upper_bound_tx -FROM - gaps -ORDER BY - gap_start_seq; -"#; - -#[derive(Debug, FromQueryResult, PartialEq, Clone)] -pub struct TreeGapModel { - pub tree: Vec, - pub gap_start_seq: i64, - pub gap_end_seq: i64, - pub lower_bound_tx: Vec, - pub upper_bound_tx: Vec, -} - -impl TreeGapModel { - pub async fn find( - conn: &DatabaseConnection, - tree: Pubkey, - ) -> Result, BubblegumOpsErrorKind> { - let statement = Statement::from_sql_and_values( - DbBackend::Postgres, - TREE_GAP_SQL, - vec![Value::Bytes(Some(Box::new(tree.as_ref().to_vec())))], - ); - - TreeGapModel::find_by_statement(statement) - .all(conn) - .await - .map_err(Into::into) - } -} - -impl TryFrom for TreeGapFill { - type Error = BubblegumOpsErrorKind; - - fn try_from(model: TreeGapModel) -> Result { - let tree = - Pubkey::try_from(model.tree).map_err(|_| BubblegumOpsErrorKind::TryFromPubkey)?; - let upper = Signature::try_from(model.upper_bound_tx) - .map_err(|_| BubblegumOpsErrorKind::TryFromSignature)?; - let lower = Signature::try_from(model.lower_bound_tx) - .map_err(|_| BubblegumOpsErrorKind::TryFromSignature)?; - - Ok(Self::new(tree, Some(upper), Some(lower))) - } -} - -pub struct TreeGapFill { - tree: Pubkey, - before: Option, - until: Option, -} - -impl TreeGapFill { - pub fn new(tree: Pubkey, before: Option, until: Option) -> Self { - Self { - tree, - before, - until, - } - } - - pub async fn crawl(&self, client: Rpc, sender: Sender) -> Result<()> { - let mut before = self.before; - - loop { - let sigs = client - .get_signatures_for_address(&self.tree, before, self.until) - .await?; - let sig_count = sigs.len(); - - let successful_transactions = sigs - .into_iter() - .filter(|transaction| transaction.err.is_none()) - .collect::>(); - - for sig in successful_transactions.iter() { - let sig = Signature::from_str(&sig.signature)?; - - sender.send(sig).await?; - - before = Some(sig); - } - - if sig_count < GET_SIGNATURES_FOR_ADDRESS_LIMIT { - break; - } - } - - Ok(()) - } -} - -#[derive(Debug, Clone)] -pub struct TreeHeaderResponse { - pub max_depth: u32, - pub max_buffer_size: u32, - pub creation_slot: u64, - pub size: usize, -} - -impl TryFrom for TreeHeaderResponse { - type Error = BubblegumOpsErrorKind; - - fn try_from(payload: ConcurrentMerkleTreeHeader) -> Result { - let size = merkle_tree_get_size(&payload)?; - - Ok(Self { - max_depth: payload.get_max_depth(), - max_buffer_size: payload.get_max_buffer_size(), - creation_slot: payload.get_creation_slot(), - size, - }) - } -} - -#[derive(Debug, Clone)] -pub struct TreeResponse { - pub pubkey: Pubkey, - pub tree_header: TreeHeaderResponse, - pub seq: u64, -} - -impl TreeResponse { - pub fn try_from_rpc(pubkey: Pubkey, account: Account) -> Result { - let bytes = account.data.as_slice(); - - let (header_bytes, rest) = bytes.split_at(CONCURRENT_MERKLE_TREE_HEADER_SIZE_V1); - let header: ConcurrentMerkleTreeHeader = - ConcurrentMerkleTreeHeader::try_from_slice(header_bytes)?; - - let merkle_tree_size = merkle_tree_get_size(&header)?; - let (tree_bytes, _canopy_bytes) = rest.split_at(merkle_tree_size); - - let seq_bytes = tree_bytes[0..8].try_into()?; - let seq = u64::from_le_bytes(seq_bytes); - - let (auth, _) = Pubkey::find_program_address(&[pubkey.as_ref()], &mpl_bubblegum::ID); - - header.assert_valid_authority(&auth)?; - - let tree_header = header.try_into()?; - - Ok(Self { - pubkey, - tree_header, - seq, - }) - } - - pub async fn all(client: &Rpc) -> Result, BubblegumOpsErrorKind> { - Ok(client - .get_program_accounts( - &id(), - Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( - 0, - vec![1u8], - ))]), - ) - .await? - .into_iter() - .filter_map(|(pubkey, account)| Self::try_from_rpc(pubkey, account).ok()) - .collect()) - } - - pub async fn find( - client: &Rpc, - pubkeys: Vec, - ) -> Result, BubblegumOpsErrorKind> { - let pubkeys: Vec = pubkeys - .into_iter() - .map(|p| Pubkey::from_str(&p)) - .collect::, _>>()?; - let pubkey_batches = pubkeys.chunks(100); - let pubkey_batches_count = pubkey_batches.len(); - - let mut gma_handles = Vec::with_capacity(pubkey_batches_count); - - for batch in pubkey_batches { - gma_handles.push(async move { - let accounts = client.get_multiple_accounts(batch).await?; - - let results: Vec<(&Pubkey, Option)> = batch.iter().zip(accounts).collect(); - - Ok::<_, BubblegumOpsErrorKind>(results) - }) - } - - let result = futures::future::try_join_all(gma_handles).await?; - - let trees = result - .into_iter() - .flatten() - .filter_map(|(pubkey, account)| { - account.map(|account| Self::try_from_rpc(*pubkey, account)) - }) - .collect::, _>>() - .map_err(|_| BubblegumOpsErrorKind::SerializeTreeResponse)?; - - Ok(trees) - } -} diff --git a/program_transformers/Cargo.toml b/program_transformers/Cargo.toml index 35bab7a19..54ea735a9 100644 --- a/program_transformers/Cargo.toml +++ b/program_transformers/Cargo.toml @@ -10,7 +10,11 @@ blockbuster = { workspace = true } bs58 = { workspace = true } cadence = { workspace = true } cadence-macros = { workspace = true } -digital_asset_types = { workspace = true, features = ["json_types", "sql_types"] } +das-core = { workspace = true } +digital_asset_types = { workspace = true, features = [ + "json_types", + "sql_types", +] } futures = { workspace = true } heck = { workspace = true } mpl-bubblegum = { workspace = true } diff --git a/program_transformers/src/lib.rs b/program_transformers/src/lib.rs index edde20ca3..f239f2768 100644 --- a/program_transformers/src/lib.rs +++ b/program_transformers/src/lib.rs @@ -15,7 +15,7 @@ use { ProgramParseResult, }, }, - futures::future::BoxFuture, + das_core::{DownloadMetadataInfo, DownloadMetadataNotifier}, sea_orm::{ entity::EntityTrait, query::Select, ConnectionTrait, DatabaseConnection, DbErr, SqlxPostgresConnector, TransactionTrait, @@ -52,33 +52,6 @@ pub struct TransactionInfo { pub meta_inner_instructions: Vec, } -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct DownloadMetadataInfo { - asset_data_id: Vec, - uri: String, -} - -impl DownloadMetadataInfo { - pub fn new(asset_data_id: Vec, uri: String) -> Self { - Self { - asset_data_id, - uri: uri.trim().replace('\0', ""), - } - } - - pub fn into_inner(self) -> (Vec, String) { - (self.asset_data_id, self.uri) - } -} - -pub type DownloadMetadataNotifier = Box< - dyn Fn( - DownloadMetadataInfo, - ) -> BoxFuture<'static, Result<(), Box>> - + Sync - + Send, ->; - pub struct ProgramTransformer { storage: DatabaseConnection, download_metadata_notifier: DownloadMetadataNotifier,