diff --git a/Cargo.lock b/Cargo.lock index 498015932..c376e7dad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1578,6 +1578,8 @@ dependencies = [ "flatbuffers", "futures", "futures-util", + "hyper", + "indicatif", "lazy_static", "log", "mpl-bubblegum", diff --git a/migration/src/m20231208_103949_create_tree_transactions_table.rs b/migration/src/m20231208_103949_create_tree_transactions_table.rs index c30061516..1889e242a 100644 --- a/migration/src/m20231208_103949_create_tree_transactions_table.rs +++ b/migration/src/m20231208_103949_create_tree_transactions_table.rs @@ -1,4 +1,5 @@ -use sea_orm_migration::prelude::*; +use sea_orm_migration::{sea_orm::ConnectionTrait, prelude::*}; +use sea_orm::Statement; #[derive(DeriveMigrationName)] pub struct Migration; @@ -6,6 +7,7 @@ pub struct Migration; #[async_trait::async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + let db = manager.get_connection(); manager .create_table( Table::create() @@ -25,6 +27,14 @@ impl MigrationTrait for Migration { ) .await?; + let stmt = Statement::from_sql_and_values( + manager.get_database_backend(), + r#"CREATE INDEX signature_processed_at_not_null_index ON tree_transactions (signature, processed_at) WHERE processed_at IS NOT NULL"#, + [] + ); + + db.execute(stmt).await?; + manager .create_index( Index::create() @@ -32,13 +42,16 @@ impl MigrationTrait for Migration { .table(TreeTransactions::Table) .col(TreeTransactions::Tree) .col(TreeTransactions::Slot) - .unique() .to_owned(), ) .await } async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_index(Index::drop().name("signature_processed_at_null_index").table(TreeTransactions::Table).to_owned()) + .await?; + manager .drop_index(Index::drop().name("tree_slot_index").table(TreeTransactions::Table).to_owned()) .await?; diff --git a/tree_backfiller/Cargo.toml b/tree_backfiller/Cargo.toml index 4d4b1285d..b91d90639 100644 --- a/tree_backfiller/Cargo.toml +++ b/tree_backfiller/Cargo.toml @@ -7,6 +7,7 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] + log = "0.4.17" env_logger = "0.10.0" anyhow = "1.0.75" @@ -19,8 +20,12 @@ redis = { version = "0.22.3", features = [ futures = { version = "0.3.25" } futures-util = "0.3.27" base64 = "0.21.0" +indicatif = "0.17.5" thiserror = "1.0.31" serde_json = "1.0.81" +cadence = "0.29.0" +cadence-macros = "0.29.0" +hyper = "0.14.23" anchor-client = "0.28.0" tokio = { version = "1.26.0", features = ["full", "tracing"] } sqlx = { version = "0.6.2", features = [ @@ -61,8 +66,6 @@ async-trait = "0.1.53" num-traits = "0.2.15" blockbuster = "0.9.0-beta.1" figment = { version = "0.10.6", features = ["env", "toml", "yaml"] } -cadence = "0.29.0" -cadence-macros = "0.29.0" solana-sdk = "~1.16.16" solana-client = "~1.16.16" spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] } diff --git a/tree_backfiller/README.md b/tree_backfiller/README.md new file mode 100644 index 000000000..a24902b27 --- /dev/null +++ b/tree_backfiller/README.md @@ -0,0 +1,60 @@ + + + +# Tree Backfiller + +The Tree Backfiller crawls all trees on-chain and backfills any transactions related to a tree that have not already been observed. + +## Commands + +Command line arguments can also be set through environment variables. + +### Run + +The `run` command initiates the crawling and backfilling process. It requires the Solana RPC URL, the database URL, and the messenger Redis URL. + +``` +Usage: das-tree-backfiller run [OPTIONS] --solana-rpc-url --database-url --messenger-redis-url + +Options: + --solana-rpc-url + Solana RPC URL [env: SOLANA_RPC_URL=https://index.rpcpool.com/a4d23a00546272efeba9843a4ae4R] + --tree-crawler-count + Number of tree crawler workers [env: TREE_CRAWLER_COUNT=] [default: 100] + --signature-channel-size + The size of the signature channel. This is the number of signatures that can be queued up. [env: SIGNATURE_CHANNEL_SIZE=] [default: 10000] + --queue-channel-size + [env: QUEUE_CHANNEL_SIZE=] [default: 1000] + --database-url + [env: DATABASE_URL=postgres://solana:solana@localhost:5432/solana] + --database-max-connections + [env: DATABASE_MAX_CONNECTIONS=] [default: 125] + --database-min-connections + [env: DATABASE_MIN_CONNECTIONS=] [default: 5] + --messenger-redis-url + [env: MESSENGER_REDIS_URL=redis://localhost:6379] + --messenger-redis-batch-size + [env: MESSENGER_REDIS_BATCH_SIZE=] [default: 100] + --messenger-stream-max-buffer-size + [env: MESSENGER_STREAM_MAX_BUFFER_SIZE=] [default: 10000000] + --metrics-host + [env: METRICS_HOST=] [default: 127.0.0.1] + --metrics-port + [env: METRICS_PORT=] [default: 8125] + -h, --help + Print help +``` + +### Metrics + +The Tree Backfiller provides several metrics for monitoring performance and status: + +Metric | Description +--- | --- +transaction.workers | Gauge of active worker threads +transaction.failed | Count of failed transaction +transaction.queued | Time for a transaction to be queued +tree.crawled | Time to crawl a tree +tree.completed | Count of completed tree crawl +tree.failed | Count of failed tree crawls +job.completed | Time to complete the job diff --git a/tree_backfiller/src/backfiller.rs b/tree_backfiller/src/backfiller.rs index ad4d34cff..98d9c9882 100644 --- a/tree_backfiller/src/backfiller.rs +++ b/tree_backfiller/src/backfiller.rs @@ -1,15 +1,21 @@ use crate::db; -use crate::{queue, tree}; +use crate::{ + metrics::{Metrics, MetricsArgs}, + queue, tree, +}; + use anyhow::Result; use clap::Parser; -use log::{debug, error, info}; +use indicatif::HumanDuration; +use log::{error, info}; use sea_orm::SqlxPostgresConnector; use solana_client::nonblocking::rpc_client::RpcClient; use solana_sdk::signature::Signature; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; use tokio::sync::{mpsc, Semaphore}; -use tokio::time::Duration; #[derive(Debug, Parser, Clone)] pub struct Args { @@ -18,19 +24,16 @@ pub struct Args { pub solana_rpc_url: String, /// Number of tree crawler workers - #[arg(long, env, default_value = "1")] + #[arg(long, env, default_value = "100")] pub tree_crawler_count: usize, /// The size of the signature channel. This is the number of signatures that can be queued up. If the channel is full, the crawler will block until there is space in the channel. - #[arg(long, env, default_value = "1")] + #[arg(long, env, default_value = "10000")] pub signature_channel_size: usize, - #[arg(long, env, default_value = "1")] + #[arg(long, env, default_value = "1000")] pub queue_channel_size: usize, - #[arg(long, env, default_value = "3000")] - pub transaction_check_timeout: u64, - /// Database configuration #[clap(flatten)] pub database: db::PoolArgs, @@ -38,6 +41,10 @@ pub struct Args { /// Redis configuration #[clap(flatten)] pub queue: queue::QueueArgs, + + /// Metrics configuration + #[clap(flatten)] + pub metrics: MetricsArgs, } /// A thread-safe counter. @@ -70,7 +77,6 @@ impl Counter { let counter = self.clone(); async move { while counter.get() > 0 { - println!("Counter value: {}", counter.get()); tokio::time::sleep(Duration::from_millis(100)).await; } } @@ -85,18 +91,38 @@ impl Clone for Counter { } } -/// Runs the tree backfiller. +/// The main function for running the backfiller. +/// +/// This function does the following: +/// 1. Sets up the Solana RPC client and the database connection pool. +/// 2. Initializes the metrics for trees, signatures, and the queue. +/// 3. Creates channels for the queue and signatures. +/// 4. Spawns a new task to handle transactions. +/// 5. Spawns a new task to handle the queue. +/// 6. Fetches all trees and spawns a new task for each tree to crawl it. +/// 7. Waits for all crawling tasks to complete. +/// 8. Waits for the transaction worker count to reach zero. +/// 9. Waits for the queue handler to finish. +/// 10. Logs the total time taken and the number of trees crawled. /// -/// This function takes a `Config` as input and returns a `Result<()>`. -/// It creates an `RpcClient` and retrieves all trees. -/// It then spawns a thread for each tree and a separate thread to handle transaction workers. -/// The function waits for all threads to finish before returning. +/// # Arguments +/// +/// * `config` - The configuration arguments for the backfiller. +/// +/// # Returns +/// +/// * `Result<()>` - Returns `Ok(())` if the function runs successfully. Returns an error otherwise. pub async fn run(config: Args) -> Result<()> { let solana_rpc = Arc::new(RpcClient::new(config.solana_rpc_url)); let sig_solana_rpc = Arc::clone(&solana_rpc); let pool = db::connect(config.database).await?; + let metrics = Metrics::try_from_config(config.metrics)?; + let tree_metrics = metrics.clone(); + let signature_metrics = metrics.clone(); + let queue_metrics = metrics.clone(); + let (queue_sender, mut queue_receiver) = mpsc::channel::>(config.queue_channel_size); let (sig_sender, mut sig_receiver) = mpsc::channel::(config.signature_channel_size); @@ -110,15 +136,28 @@ pub async fn run(config: Args) -> Result<()> { let solana_rpc = Arc::clone(&sig_solana_rpc); let transaction_worker_count_sig = transaction_worker_count.clone(); let queue_sender = queue_sender.clone(); + let metrics = signature_metrics.clone(); transaction_worker_count_sig.increment(); + if let Ok(transaction_workers_running) = u64::try_from(transaction_worker_count_sig.get()) { + metrics.gauge("transaction.workers", transaction_workers_running); + } + let transaction_task = async move { + let timing = Instant::now(); if let Err(e) = tree::transaction(solana_rpc, queue_sender, signature).await { + metrics.increment("transaction.failed"); error!("retrieving transaction: {:?}", e); } transaction_worker_count_sig.decrement(); + + if let Ok(transaction_workers_running) = u64::try_from(transaction_worker_count_sig.get()) { + metrics.gauge("transaction.workers", transaction_workers_running); + } + + metrics.time("transaction.queued", timing.elapsed()); }; tokio::spawn(transaction_task); @@ -135,32 +174,52 @@ pub async fn run(config: Args) -> Result<()> { while let Some(data) = queue_receiver.recv().await { if let Err(e) = queue.push(&data).await { + queue_metrics.increment("transaction.failed"); error!("pushing to queue: {:?}", e); + } else { + queue_metrics.increment("transaction.succeeded"); } } Ok::<(), anyhow::Error>(()) }); + let started = Instant::now(); + let trees = tree::all(&solana_rpc).await?; + let tree_count = trees.len(); + + info!( + "fetched {} trees in {}", + tree_count, + HumanDuration(started.elapsed()) + ); let semaphore = Arc::new(Semaphore::new(config.tree_crawler_count)); - let mut crawl_handlers = Vec::with_capacity(trees.len()); + let mut crawl_handlers = Vec::with_capacity(tree_count); for tree in trees { - let solana_rpc = Arc::clone(&solana_rpc); + let client = Arc::clone(&solana_rpc); let semaphore = Arc::clone(&semaphore); let sig_sender = sig_sender.clone(); let pool = pool.clone(); let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + let metrics = tree_metrics.clone(); let crawl_handler = tokio::spawn(async move { let _permit = semaphore.acquire().await?; - if let Err(e) = tree.crawl(solana_rpc, sig_sender, conn).await { + let timing = Instant::now(); + + if let Err(e) = tree.crawl(client, sig_sender, conn).await { + metrics.increment("tree.failed"); error!("crawling tree: {:?}", e); + } else { + metrics.increment("tree.completed"); } + metrics.time("tree.crawled", timing.elapsed()); + Ok::<(), anyhow::Error>(()) }); @@ -171,5 +230,13 @@ pub async fn run(config: Args) -> Result<()> { transaction_worker_count_check.zero().await; let _ = queue_handler.await?; + metrics.time("job.completed", started.elapsed()); + + info!( + "crawled {} trees in {}", + tree_count, + HumanDuration(started.elapsed()) + ); + Ok(()) } diff --git a/tree_backfiller/src/main.rs b/tree_backfiller/src/main.rs index 2b5da240b..83545228f 100644 --- a/tree_backfiller/src/main.rs +++ b/tree_backfiller/src/main.rs @@ -1,5 +1,6 @@ mod backfiller; mod db; +mod metrics; mod queue; mod tree; diff --git a/tree_backfiller/src/metrics.rs b/tree_backfiller/src/metrics.rs new file mode 100644 index 000000000..e8789888b --- /dev/null +++ b/tree_backfiller/src/metrics.rs @@ -0,0 +1,52 @@ +use anyhow::Result; +use cadence::{BufferedUdpMetricSink, Counted, Gauged, QueuingMetricSink, StatsdClient, Timed}; +use clap::Parser; +use log::error; +use std::time::Duration; +use std::{net::UdpSocket, sync::Arc}; + +const METRICS_PREFIX: &str = "das.backfiller"; + +#[derive(Clone, Parser, Debug)] +pub struct MetricsArgs { + #[arg(long, env, default_value = "127.0.0.1")] + pub metrics_host: String, + #[arg(long, env, default_value = "8125")] + pub metrics_port: u16, +} + +#[derive(Clone, Debug)] +pub struct Metrics(Arc); + +impl Metrics { + pub fn try_from_config(config: MetricsArgs) -> Result { + let host = (config.metrics_host, config.metrics_port); + + let socket = UdpSocket::bind("0.0.0.0:0")?; + socket.set_nonblocking(true)?; + + let udp_sink = BufferedUdpMetricSink::from(host, socket)?; + let queuing_sink = QueuingMetricSink::from(udp_sink); + let client = StatsdClient::from_sink(METRICS_PREFIX, queuing_sink); + + Ok(Metrics(Arc::new(client))) + } + + pub fn time(&self, key: &str, duration: Duration) { + if let Err(e) = self.0.time(key, duration) { + error!("submitting time: {:?}", e) + } + } + + pub fn gauge(&self, key: &str, amount: u64) { + if let Err(e) = self.0.gauge(key, amount) { + error!("submitting gauge: {:?}", e) + } + } + + pub fn increment(&self, key: &str) { + if let Err(e) = self.0.count(key, 1) { + error!("submitting increment: {:?}", e) + } + } +} diff --git a/tree_backfiller/src/queue.rs b/tree_backfiller/src/queue.rs index 215115974..ceef75a66 100644 --- a/tree_backfiller/src/queue.rs +++ b/tree_backfiller/src/queue.rs @@ -5,7 +5,7 @@ use plerkle_messenger::{ redis_messenger::RedisMessenger, Messenger, MessengerConfig, MessengerError, MessengerType, }; -const TRANSACTION_BACKFILL_STREAM: &'static str = "TXNFILL"; +const TRANSACTION_BACKFILL_STREAM: &str = "TXNFILL"; #[derive(Clone, Debug, Parser)] pub struct QueueArgs { @@ -33,7 +33,7 @@ impl From for MessengerConfig { Self { messenger_type: MessengerType::Redis, - connection_config: connection_config, + connection_config, } } } diff --git a/tree_backfiller/src/tree.rs b/tree_backfiller/src/tree.rs index f3214a105..328e25c94 100644 --- a/tree_backfiller/src/tree.rs +++ b/tree_backfiller/src/tree.rs @@ -1,9 +1,9 @@ -use crate::queue::Queue; use anyhow::Result; use borsh::BorshDeserialize; use clap::Args; use digital_asset_types::dao::tree_transactions; use flatbuffers::FlatBufferBuilder; +use log::info; use plerkle_serialization::serializer::seralize_encoded_transaction_with_status; use sea_orm::{ sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait, @@ -16,7 +16,12 @@ use solana_client::{ rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionConfig}, rpc_filter::{Memcmp, RpcFilterType}, }; -use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature}; +use solana_sdk::{ + account::Account, + commitment_config::{CommitmentConfig, CommitmentLevel}, + pubkey::Pubkey, + signature::Signature, +}; use solana_transaction_status::UiTransactionEncoding; use spl_account_compression::id; use spl_account_compression::state::{ @@ -29,7 +34,7 @@ use tokio::sync::mpsc::Sender; const GET_SIGNATURES_FOR_ADDRESS_LIMIT: usize = 1000; -#[derive(Debug, Clone, Default, Args)] +#[derive(Debug, Clone, Args)] pub struct ConfigBackfiller { /// Solana RPC URL #[arg(long, env)] @@ -94,30 +99,6 @@ impl TreeResponse { tree_header, }) } -} - -pub async fn all(client: &Arc) -> Result, TreeErrorKind> { - let config = RpcProgramAccountsConfig { - filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( - 0, - vec![1u8], - ))]), - account_config: RpcAccountInfoConfig { - encoding: Some(UiAccountEncoding::Base64), - ..RpcAccountInfoConfig::default() - }, - ..RpcProgramAccountsConfig::default() - }; - - Ok(client - .get_program_accounts_with_config(&id(), config) - .await? - .into_iter() - .filter_map(|(pubkey, account)| TreeResponse::try_from_rpc(pubkey, account).ok()) - .collect()) -} - -impl TreeResponse { pub async fn crawl( &self, client: Arc, @@ -131,8 +112,7 @@ impl TreeResponse { .order_by_desc(tree_transactions::Column::Slot) .one(&conn) .await? - .map(|t| Signature::from_str(&t.signature).ok()) - .flatten(); + .and_then(|t| Signature::from_str(&t.signature).ok()); loop { let sigs = client @@ -141,6 +121,9 @@ impl TreeResponse { GetConfirmedSignaturesForAddress2Config { before, until, + commitment: Some(CommitmentConfig { + commitment: CommitmentLevel::Finalized, + }), ..GetConfirmedSignaturesForAddress2Config::default() }, ) @@ -150,6 +133,20 @@ impl TreeResponse { let slot = i64::try_from(sig.slot)?; let sig = Signature::from_str(&sig.signature)?; + let tree_transaction_processed = tree_transactions::Entity::find() + .filter( + tree_transactions::Column::Signature + .eq(sig.to_string()) + .and(tree_transactions::Column::ProcessedAt.is_not_null()), + ) + .one(&conn) + .await?; + + if tree_transaction_processed.is_some() { + info!("skipping previously processed transaction {}", sig); + continue; + } + let tree_transaction = tree_transactions::ActiveModel { signature: Set(sig.to_string()), tree: Set(self.pubkey.as_ref().to_vec()), @@ -166,7 +163,7 @@ impl TreeResponse { .exec(&conn) .await?; - sender.send(sig.clone()).await?; + sender.send(sig).await?; before = Some(sig); } @@ -180,6 +177,30 @@ impl TreeResponse { } } +pub async fn all(client: &Arc) -> Result, TreeErrorKind> { + let config = RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 0, + vec![1u8], + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + commitment: Some(CommitmentConfig { + commitment: CommitmentLevel::Finalized, + }), + ..RpcAccountInfoConfig::default() + }, + ..RpcProgramAccountsConfig::default() + }; + + Ok(client + .get_program_accounts_with_config(&id(), config) + .await? + .into_iter() + .filter_map(|(pubkey, account)| TreeResponse::try_from_rpc(pubkey, account).ok()) + .collect()) +} + pub async fn transaction<'a>( client: Arc, sender: Sender>, @@ -191,6 +212,9 @@ pub async fn transaction<'a>( RpcTransactionConfig { encoding: Some(UiTransactionEncoding::Base58), max_supported_transaction_version: Some(0), + commitment: Some(CommitmentConfig { + commitment: CommitmentLevel::Finalized, + }), ..RpcTransactionConfig::default() }, )