Skip to content

Commit

Permalink
[Tree Backfiller] Metrics and Logging (#115)
Browse files Browse the repository at this point in the history
* refactor(backfiller): tree backfilling using getSignaturesForAdress. fetch all trees, fetch associated transactions

* feat(backfiller): generate table and model for  query last transaction record for fast forwarding tree transaction crawling

* feat(backfiller): push transaction payloads to redis through the perkle messenger. mark tree transactons as processed_at so know it completed the index loop.

* fix(backfiller): git history changes made from running formatter. just include changes needed by the backfiller.

* fix(backfiller): support mock feature for sea-orm by switching to pg pool and sea_orm adapter.

* feat(backfiller): emit performance and reliability metrics to statsd

* refactor(backfiller): only backfill trees from finalized blocks

* refactor(backfiller): skip queueing a  transaction that is already processed at
  • Loading branch information
kespinola authored Dec 14, 2023
1 parent 1968c00 commit 2980c89
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 54 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 15 additions & 2 deletions migration/src/m20231208_103949_create_tree_transactions_table.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use sea_orm_migration::prelude::*;
use sea_orm_migration::{sea_orm::ConnectionTrait, prelude::*};
use sea_orm::Statement;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
manager
.create_table(
Table::create()
Expand All @@ -25,20 +27,31 @@ impl MigrationTrait for Migration {
)
.await?;

let stmt = Statement::from_sql_and_values(
manager.get_database_backend(),
r#"CREATE INDEX signature_processed_at_not_null_index ON tree_transactions (signature, processed_at) WHERE processed_at IS NOT NULL"#,
[]
);

db.execute(stmt).await?;

manager
.create_index(
Index::create()
.name("tree_slot_index")
.table(TreeTransactions::Table)
.col(TreeTransactions::Tree)
.col(TreeTransactions::Slot)
.unique()
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_index(Index::drop().name("signature_processed_at_null_index").table(TreeTransactions::Table).to_owned())
.await?;

manager
.drop_index(Index::drop().name("tree_slot_index").table(TreeTransactions::Table).to_owned())
.await?;
Expand Down
7 changes: 5 additions & 2 deletions tree_backfiller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

log = "0.4.17"
env_logger = "0.10.0"
anyhow = "1.0.75"
Expand All @@ -19,8 +20,12 @@ redis = { version = "0.22.3", features = [
futures = { version = "0.3.25" }
futures-util = "0.3.27"
base64 = "0.21.0"
indicatif = "0.17.5"
thiserror = "1.0.31"
serde_json = "1.0.81"
cadence = "0.29.0"
cadence-macros = "0.29.0"
hyper = "0.14.23"
anchor-client = "0.28.0"
tokio = { version = "1.26.0", features = ["full", "tracing"] }
sqlx = { version = "0.6.2", features = [
Expand Down Expand Up @@ -61,8 +66,6 @@ async-trait = "0.1.53"
num-traits = "0.2.15"
blockbuster = "0.9.0-beta.1"
figment = { version = "0.10.6", features = ["env", "toml", "yaml"] }
cadence = "0.29.0"
cadence-macros = "0.29.0"
solana-sdk = "~1.16.16"
solana-client = "~1.16.16"
spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] }
Expand Down
60 changes: 60 additions & 0 deletions tree_backfiller/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@



# Tree Backfiller

The Tree Backfiller crawls all trees on-chain and backfills any transactions related to a tree that have not already been observed.

## Commands

Command line arguments can also be set through environment variables.

### Run

The `run` command initiates the crawling and backfilling process. It requires the Solana RPC URL, the database URL, and the messenger Redis URL.

```
Usage: das-tree-backfiller run [OPTIONS] --solana-rpc-url <SOLANA_RPC_URL> --database-url <DATABASE_URL> --messenger-redis-url <MESSENGER_REDIS_URL>
Options:
--solana-rpc-url <SOLANA_RPC_URL>
Solana RPC URL [env: SOLANA_RPC_URL=https://index.rpcpool.com/a4d23a00546272efeba9843a4ae4R]
--tree-crawler-count <TREE_CRAWLER_COUNT>
Number of tree crawler workers [env: TREE_CRAWLER_COUNT=] [default: 100]
--signature-channel-size <SIGNATURE_CHANNEL_SIZE>
The size of the signature channel. This is the number of signatures that can be queued up. [env: SIGNATURE_CHANNEL_SIZE=] [default: 10000]
--queue-channel-size <QUEUE_CHANNEL_SIZE>
[env: QUEUE_CHANNEL_SIZE=] [default: 1000]
--database-url <DATABASE_URL>
[env: DATABASE_URL=postgres://solana:solana@localhost:5432/solana]
--database-max-connections <DATABASE_MAX_CONNECTIONS>
[env: DATABASE_MAX_CONNECTIONS=] [default: 125]
--database-min-connections <DATABASE_MIN_CONNECTIONS>
[env: DATABASE_MIN_CONNECTIONS=] [default: 5]
--messenger-redis-url <MESSENGER_REDIS_URL>
[env: MESSENGER_REDIS_URL=redis://localhost:6379]
--messenger-redis-batch-size <MESSENGER_REDIS_BATCH_SIZE>
[env: MESSENGER_REDIS_BATCH_SIZE=] [default: 100]
--messenger-stream-max-buffer-size <MESSENGER_STREAM_MAX_BUFFER_SIZE>
[env: MESSENGER_STREAM_MAX_BUFFER_SIZE=] [default: 10000000]
--metrics-host <METRICS_HOST>
[env: METRICS_HOST=] [default: 127.0.0.1]
--metrics-port <METRICS_PORT>
[env: METRICS_PORT=] [default: 8125]
-h, --help
Print help
```

### Metrics

The Tree Backfiller provides several metrics for monitoring performance and status:

Metric | Description
--- | ---
transaction.workers | Gauge of active worker threads
transaction.failed | Count of failed transaction
transaction.queued | Time for a transaction to be queued
tree.crawled | Time to crawl a tree
tree.completed | Count of completed tree crawl
tree.failed | Count of failed tree crawls
job.completed | Time to complete the job
103 changes: 85 additions & 18 deletions tree_backfiller/src/backfiller.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use crate::db;
use crate::{queue, tree};
use crate::{
metrics::{Metrics, MetricsArgs},
queue, tree,
};

use anyhow::Result;
use clap::Parser;
use log::{debug, error, info};
use indicatif::HumanDuration;
use log::{error, info};
use sea_orm::SqlxPostgresConnector;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::Signature;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::sync::{mpsc, Semaphore};
use tokio::time::Duration;

#[derive(Debug, Parser, Clone)]
pub struct Args {
Expand All @@ -18,26 +24,27 @@ pub struct Args {
pub solana_rpc_url: String,

/// Number of tree crawler workers
#[arg(long, env, default_value = "1")]
#[arg(long, env, default_value = "100")]
pub tree_crawler_count: usize,

/// The size of the signature channel. This is the number of signatures that can be queued up. If the channel is full, the crawler will block until there is space in the channel.
#[arg(long, env, default_value = "1")]
#[arg(long, env, default_value = "10000")]
pub signature_channel_size: usize,

#[arg(long, env, default_value = "1")]
#[arg(long, env, default_value = "1000")]
pub queue_channel_size: usize,

#[arg(long, env, default_value = "3000")]
pub transaction_check_timeout: u64,

/// Database configuration
#[clap(flatten)]
pub database: db::PoolArgs,

/// Redis configuration
#[clap(flatten)]
pub queue: queue::QueueArgs,

/// Metrics configuration
#[clap(flatten)]
pub metrics: MetricsArgs,
}

/// A thread-safe counter.
Expand Down Expand Up @@ -70,7 +77,6 @@ impl Counter {
let counter = self.clone();
async move {
while counter.get() > 0 {
println!("Counter value: {}", counter.get());
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
Expand All @@ -85,18 +91,38 @@ impl Clone for Counter {
}
}

/// Runs the tree backfiller.
/// The main function for running the backfiller.
///
/// This function does the following:
/// 1. Sets up the Solana RPC client and the database connection pool.
/// 2. Initializes the metrics for trees, signatures, and the queue.
/// 3. Creates channels for the queue and signatures.
/// 4. Spawns a new task to handle transactions.
/// 5. Spawns a new task to handle the queue.
/// 6. Fetches all trees and spawns a new task for each tree to crawl it.
/// 7. Waits for all crawling tasks to complete.
/// 8. Waits for the transaction worker count to reach zero.
/// 9. Waits for the queue handler to finish.
/// 10. Logs the total time taken and the number of trees crawled.
///
/// This function takes a `Config` as input and returns a `Result<()>`.
/// It creates an `RpcClient` and retrieves all trees.
/// It then spawns a thread for each tree and a separate thread to handle transaction workers.
/// The function waits for all threads to finish before returning.
/// # Arguments
///
/// * `config` - The configuration arguments for the backfiller.
///
/// # Returns
///
/// * `Result<()>` - Returns `Ok(())` if the function runs successfully. Returns an error otherwise.
pub async fn run(config: Args) -> Result<()> {
let solana_rpc = Arc::new(RpcClient::new(config.solana_rpc_url));
let sig_solana_rpc = Arc::clone(&solana_rpc);

let pool = db::connect(config.database).await?;

let metrics = Metrics::try_from_config(config.metrics)?;
let tree_metrics = metrics.clone();
let signature_metrics = metrics.clone();
let queue_metrics = metrics.clone();

let (queue_sender, mut queue_receiver) = mpsc::channel::<Vec<u8>>(config.queue_channel_size);
let (sig_sender, mut sig_receiver) = mpsc::channel::<Signature>(config.signature_channel_size);

Expand All @@ -110,15 +136,28 @@ pub async fn run(config: Args) -> Result<()> {
let solana_rpc = Arc::clone(&sig_solana_rpc);
let transaction_worker_count_sig = transaction_worker_count.clone();
let queue_sender = queue_sender.clone();
let metrics = signature_metrics.clone();

transaction_worker_count_sig.increment();

if let Ok(transaction_workers_running) = u64::try_from(transaction_worker_count_sig.get()) {
metrics.gauge("transaction.workers", transaction_workers_running);
}

let transaction_task = async move {
let timing = Instant::now();
if let Err(e) = tree::transaction(solana_rpc, queue_sender, signature).await {
metrics.increment("transaction.failed");
error!("retrieving transaction: {:?}", e);
}

transaction_worker_count_sig.decrement();

if let Ok(transaction_workers_running) = u64::try_from(transaction_worker_count_sig.get()) {
metrics.gauge("transaction.workers", transaction_workers_running);
}

metrics.time("transaction.queued", timing.elapsed());
};

tokio::spawn(transaction_task);
Expand All @@ -135,32 +174,52 @@ pub async fn run(config: Args) -> Result<()> {

while let Some(data) = queue_receiver.recv().await {
if let Err(e) = queue.push(&data).await {
queue_metrics.increment("transaction.failed");
error!("pushing to queue: {:?}", e);
} else {
queue_metrics.increment("transaction.succeeded");
}
}

Ok::<(), anyhow::Error>(())
});

let started = Instant::now();

let trees = tree::all(&solana_rpc).await?;
let tree_count = trees.len();

info!(
"fetched {} trees in {}",
tree_count,
HumanDuration(started.elapsed())
);

let semaphore = Arc::new(Semaphore::new(config.tree_crawler_count));
let mut crawl_handlers = Vec::with_capacity(trees.len());
let mut crawl_handlers = Vec::with_capacity(tree_count);

for tree in trees {
let solana_rpc = Arc::clone(&solana_rpc);
let client = Arc::clone(&solana_rpc);
let semaphore = Arc::clone(&semaphore);
let sig_sender = sig_sender.clone();
let pool = pool.clone();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);
let metrics = tree_metrics.clone();

let crawl_handler = tokio::spawn(async move {
let _permit = semaphore.acquire().await?;

if let Err(e) = tree.crawl(solana_rpc, sig_sender, conn).await {
let timing = Instant::now();

if let Err(e) = tree.crawl(client, sig_sender, conn).await {
metrics.increment("tree.failed");
error!("crawling tree: {:?}", e);
} else {
metrics.increment("tree.completed");
}

metrics.time("tree.crawled", timing.elapsed());

Ok::<(), anyhow::Error>(())
});

Expand All @@ -171,5 +230,13 @@ pub async fn run(config: Args) -> Result<()> {
transaction_worker_count_check.zero().await;
let _ = queue_handler.await?;

metrics.time("job.completed", started.elapsed());

info!(
"crawled {} trees in {}",
tree_count,
HumanDuration(started.elapsed())
);

Ok(())
}
1 change: 1 addition & 0 deletions tree_backfiller/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod backfiller;
mod db;
mod metrics;
mod queue;
mod tree;

Expand Down
Loading

0 comments on commit 2980c89

Please sign in to comment.