Skip to content

Commit

Permalink
refactor(backfiller): add retries to rpc calls (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola authored Dec 19, 2023
1 parent 1548bd9 commit 5b6a17f
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 91 deletions.
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tree_backfiller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ publish = false

[dependencies]

backon = "0.4.1"
log = "0.4.17"
env_logger = "0.10.0"
anyhow = "1.0.75"
Expand Down
31 changes: 15 additions & 16 deletions tree_backfiller/src/backfiller.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::db;
use crate::{
metrics::{Metrics, MetricsArgs},
queue, tree,
queue,
rpc::{Rpc, SolanaRpcArgs},
tree,
};

use anyhow::Result;
Expand All @@ -10,11 +12,7 @@ use digital_asset_types::dao::tree_transactions;
use indicatif::HumanDuration;
use log::{error, info};
use sea_orm::SqlxPostgresConnector;
use sea_orm::{
sea_query::OnConflict, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait,
QueryFilter, QueryOrder,
};
use solana_client::nonblocking::rpc_client::RpcClient;
use sea_orm::{sea_query::OnConflict, EntityTrait};
use solana_sdk::signature::Signature;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand All @@ -25,10 +23,6 @@ use tokio::sync::{mpsc, Semaphore};

#[derive(Debug, Parser, Clone)]
pub struct Args {
/// Solana RPC URL
#[arg(long, env)]
pub solana_rpc_url: String,

/// Number of tree crawler workers
#[arg(long, env, default_value = "20")]
pub tree_crawler_count: usize,
Expand All @@ -54,6 +48,10 @@ pub struct Args {
/// Metrics configuration
#[clap(flatten)]
pub metrics: MetricsArgs,

/// Solana configuration
#[clap(flatten)]
pub solana: SolanaRpcArgs,
}

/// A thread-safe counter.
Expand Down Expand Up @@ -119,8 +117,8 @@ impl Clone for Counter {
/// This function returns a `Result` which is `Ok` if the backfilling process completes
/// successfully, or an `Error` if any part of the process fails.
pub async fn run(config: Args) -> Result<()> {
let solana_rpc = Arc::new(RpcClient::new(config.solana_rpc_url));
let transaction_solana_rpc = Arc::clone(&solana_rpc);
let solana_rpc = Rpc::from_config(config.solana);
let transaction_solana_rpc = solana_rpc.clone();

let pool = db::connect(config.database).await?;
let transaction_pool = pool.clone();
Expand All @@ -144,14 +142,15 @@ pub async fn run(config: Args) -> Result<()> {
let solana_rpc = transaction_solana_rpc.clone();
let metrics = transaction_metrics.clone();
let queue = queue.clone();
let semaphore = semaphore.clone();
let pool = transaction_pool.clone();
let count = transaction_worker_transaction_count.clone();

count.increment();

let _permit = semaphore.acquire().await?;

tokio::spawn(async move {
let _permit = semaphore.acquire().await?;

let timing = Instant::now();
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool);

Expand All @@ -167,7 +166,7 @@ pub async fn run(config: Args) -> Result<()> {
if let Ok(tree_transaction) = inserted_tree_transaction {
let signature = Signature::from_str(&tree_transaction.signature)?;

if let Err(e) = tree::transaction(solana_rpc, queue, signature).await {
if let Err(e) = tree::transaction(&solana_rpc, queue, signature).await {
error!("tree transaction: {:?}", e);
metrics.increment("transaction.failed");
} else {
Expand Down Expand Up @@ -217,7 +216,7 @@ pub async fn run(config: Args) -> Result<()> {

let timing = Instant::now();

if let Err(e) = tree.crawl(client, sig_sender, conn).await {
if let Err(e) = tree.crawl(&client, sig_sender, conn).await {
metrics.increment("tree.failed");
error!("crawling tree: {:?}", e);
} else {
Expand Down
1 change: 1 addition & 0 deletions tree_backfiller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod backfiller;
mod db;
mod metrics;
mod queue;
mod rpc;
mod tree;

use anyhow::Result;
Expand Down
136 changes: 136 additions & 0 deletions tree_backfiller/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use anyhow::Result;
use backon::ExponentialBuilder;
use backon::Retryable;
use clap::Parser;
use solana_account_decoder::UiAccountEncoding;
use solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature;
use solana_client::{
client_error::ClientError,
nonblocking::rpc_client::RpcClient,
rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcTransactionConfig},
rpc_filter::RpcFilterType,
};
use solana_sdk::{
account::Account,
commitment_config::{CommitmentConfig, CommitmentLevel},
pubkey::Pubkey,
signature::Signature,
};
use solana_transaction_status::EncodedConfirmedTransactionWithStatusMeta;
use solana_transaction_status::UiTransactionEncoding;
use std::sync::Arc;

#[derive(Clone, Parser, Debug)]
pub struct SolanaRpcArgs {
#[arg(long, env)]
pub solana_rpc_url: String,
}

#[derive(Clone)]
pub struct Rpc(Arc<RpcClient>);

impl Rpc {
pub fn from_config(config: SolanaRpcArgs) -> Self {
Rpc(Arc::new(RpcClient::new(config.solana_rpc_url)))
}

pub async fn get_transaction(
&self,
signature: &Signature,
) -> Result<EncodedConfirmedTransactionWithStatusMeta, ClientError> {
(|| async {
self.0
.get_transaction_with_config(
signature,
RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Base58),
max_supported_transaction_version: Some(0),
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Finalized,
}),
..RpcTransactionConfig::default()
},
)
.await
})
.retry(&ExponentialBuilder::default())
.await
}

pub async fn get_signatures_for_address(
&self,
pubkey: &Pubkey,
before: Option<Signature>,
until: Option<Signature>,
) -> Result<Vec<RpcConfirmedTransactionStatusWithSignature>, ClientError> {
(|| async {
self.0
.get_signatures_for_address_with_config(
pubkey,
GetConfirmedSignaturesForAddress2Config {
before,
until,
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Finalized,
}),
..GetConfirmedSignaturesForAddress2Config::default()
},
)
.await
})
.retry(&ExponentialBuilder::default())
.await
}

pub async fn get_program_accounts(
&self,
program: &Pubkey,
filters: Option<Vec<RpcFilterType>>,
) -> Result<Vec<(Pubkey, Account)>, ClientError> {
(|| async {
let filters = filters.clone();

self.0
.get_program_accounts_with_config(
program,
RpcProgramAccountsConfig {
filters,
account_config: RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Finalized,
}),
..RpcAccountInfoConfig::default()
},
..RpcProgramAccountsConfig::default()
},
)
.await
})
.retry(&ExponentialBuilder::default())
.await
}

pub async fn get_multiple_accounts(
&self,
pubkeys: &[Pubkey],
) -> Result<Vec<Option<Account>>, ClientError> {
Ok((|| async {
self.0
.get_multiple_accounts_with_config(
pubkeys,
RpcAccountInfoConfig {
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Finalized,
}),
..RpcAccountInfoConfig::default()
},
)
.await
})
.retry(&ExponentialBuilder::default())
.await?
.value)
}
}
Loading

0 comments on commit 5b6a17f

Please sign in to comment.