diff --git a/Cargo.lock b/Cargo.lock index 4790186d4..bc2c1440a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1697,10 +1697,19 @@ name = "das-core" version = "0.7.2" dependencies = [ "anyhow", + "backon", "cadence", "cadence-macros", "clap 4.4.8", + "figment", + "plerkle_messenger", + "solana-account-decoder", + "solana-client", + "solana-sdk", + "solana-transaction-status", "sqlx", + "thiserror", + "tokio", ] [[package]] diff --git a/core/Cargo.toml b/core/Cargo.toml index d230f883a..d0532684f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -7,8 +7,17 @@ publish.workspace = true [dependencies] anyhow = { workspace = true } +backon = { workspace = true } +solana-account-decoder = { workspace = true } +solana-client = { workspace = true } +solana-sdk = { workspace = true } +solana-transaction-status = { workspace = true } cadence = { workspace = true } cadence-macros = { workspace = true } +thiserror = { workspace = true } +figment = { workspace = true } +plerkle_messenger = { workspace = true } +tokio = { workspace = true } clap = { workspace = true, features = ["derive", "cargo", "env"] } sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] } diff --git a/core/src/lib.rs b/core/src/lib.rs index 7041fb4b3..da6bb050e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,5 +1,9 @@ mod db; mod metrics; +mod plerkle_messenger_queue; +mod solana_rpc; pub use db::*; pub use metrics::*; +pub use plerkle_messenger_queue::*; +pub use solana_rpc::*; diff --git a/core/src/plerkle_messenger_queue.rs b/core/src/plerkle_messenger_queue.rs new file mode 100644 index 000000000..8c30d01c0 --- /dev/null +++ b/core/src/plerkle_messenger_queue.rs @@ -0,0 +1,180 @@ +use anyhow::Result; +use clap::Parser; +use figment::value::{Dict, Value}; +use plerkle_messenger::{ + Messenger, MessengerConfig, MessengerType, ACCOUNT_BACKFILL_STREAM, ACCOUNT_STREAM, + TRANSACTION_BACKFILL_STREAM, TRANSACTION_STREAM, +}; +use std::num::TryFromIntError; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::sync::{mpsc::error::TrySendError, Mutex}; + +#[derive(Clone, Debug, Parser)] +pub struct QueueArgs { + #[arg(long, env)] + pub messenger_redis_url: String, + #[arg(long, env, default_value = "100")] + pub messenger_redis_batch_size: String, + #[arg(long, env, default_value = "25")] + pub messenger_queue_connections: u64, +} + +impl From for MessengerConfig { + fn from(args: QueueArgs) -> Self { + let mut connection_config = Dict::new(); + + connection_config.insert( + "redis_connection_str".to_string(), + Value::from(args.messenger_redis_url), + ); + connection_config.insert( + "batch_size".to_string(), + Value::from(args.messenger_redis_batch_size), + ); + connection_config.insert( + "pipeline_size_bytes".to_string(), + Value::from(1u128.to_string()), + ); + + Self { + messenger_type: MessengerType::Redis, + connection_config, + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum QueuePoolError { + #[error("messenger")] + Messenger(#[from] plerkle_messenger::MessengerError), + #[error("tokio try send to channel")] + TrySendMessengerChannel(#[from] TrySendError>), + #[error("revc messenger connection")] + RecvMessengerConnection, + #[error("try from int")] + TryFromInt(#[from] TryFromIntError), + #[error("tokio send to channel")] + SendMessengerChannel(#[from] mpsc::error::SendError>), +} + +#[derive(Debug, Clone)] +pub struct QueuePool { + tx: mpsc::Sender>, + rx: Arc>>>, +} + +impl QueuePool { + pub async fn try_from_config(config: QueueArgs) -> anyhow::Result { + let size = usize::try_from(config.messenger_queue_connections)?; + let (tx, rx) = mpsc::channel(size); + + for _ in 0..config.messenger_queue_connections { + let messenger_config: MessengerConfig = config.clone().into(); + let mut messenger = plerkle_messenger::select_messenger(messenger_config).await?; + + let streams = [ + (plerkle_messenger::ACCOUNT_STREAM, 100_000_000), + (plerkle_messenger::ACCOUNT_BACKFILL_STREAM, 100_000_000), + (plerkle_messenger::SLOT_STREAM, 100_000), + (plerkle_messenger::TRANSACTION_STREAM, 10_000_000), + (plerkle_messenger::TRANSACTION_BACKFILL_STREAM, 10_000_000), + (plerkle_messenger::BLOCK_STREAM, 100_000), + ]; + + for &(key, size) in &streams { + messenger.add_stream(key).await?; + messenger.set_buffer_size(key, size).await; + } + + tx.try_send(messenger)?; + } + + Ok(Self { + tx, + rx: Arc::new(Mutex::new(rx)), + }) + } + + /// Pushes account backfill data to the appropriate stream. + /// + /// This method sends account backfill data to the `ACCOUNT_BACKFILL_STREAM`. + /// It is used for backfilling account information in the system. + /// + /// # Arguments + /// + /// * `bytes` - A byte slice containing the account backfill data to be pushed. + /// + /// # Returns + /// + /// This method returns a `Result` which is `Ok` if the push is successful, + /// or an `Err` with a `QueuePoolError` if the push fails. + pub async fn push_account_backfill(&self, bytes: &[u8]) -> Result<(), QueuePoolError> { + self.push(ACCOUNT_BACKFILL_STREAM, bytes).await + } + + /// Pushes transaction backfill data to the appropriate stream. + /// + /// This method sends transaction backfill data to the `TRANSACTION_BACKFILL_STREAM`. + /// It is used for backfilling transaction information in the system. + /// + /// # Arguments + /// + /// * `bytes` - A byte slice containing the transaction backfill data to be pushed. + /// + /// # Returns + /// + /// This method returns a `Result` which is `Ok` if the push is successful, + /// or an `Err` with a `QueuePoolError` if the push fails. + pub async fn push_transaction_backfill(&self, bytes: &[u8]) -> Result<(), QueuePoolError> { + self.push(TRANSACTION_BACKFILL_STREAM, bytes).await + } + + /// Pushes account data to the appropriate stream. + /// + /// This method sends account data to the `ACCOUNT_STREAM`. + /// It is used for pushing real-time account updates to the system. + /// + /// # Arguments + /// + /// * `bytes` - A byte slice containing the account data to be pushed. + /// + /// # Returns + /// + /// This method returns a `Result` which is `Ok` if the push is successful, + /// or an `Err` with a `QueuePoolError` if the push fails. + pub async fn push_account(&self, bytes: &[u8]) -> Result<(), QueuePoolError> { + self.push(ACCOUNT_STREAM, bytes).await + } + + /// Pushes transaction data to the appropriate stream. + /// + /// This method sends transaction data to the `TRANSACTION_STREAM`. + /// It is used for pushing real-time transaction updates to the system. + /// + /// # Arguments + /// + /// * `bytes` - A byte slice containing the transaction data to be pushed. + /// + /// # Returns + /// + /// This method returns a `Result` which is `Ok` if the push is successful, + /// or an `Err` with a `QueuePoolError` if the push fails. + pub async fn push_transaction(&self, bytes: &[u8]) -> Result<(), QueuePoolError> { + self.push(TRANSACTION_STREAM, bytes).await + } + + async fn push(&self, stream_key: &'static str, bytes: &[u8]) -> Result<(), QueuePoolError> { + let mut rx = self.rx.lock().await; + let mut messenger = rx + .recv() + .await + .ok_or(QueuePoolError::RecvMessengerConnection)?; + + messenger.send(stream_key, bytes).await?; + + self.tx.send(messenger).await?; + + Ok(()) + } +} diff --git a/ops/src/bubblegum/rpc.rs b/core/src/solana_rpc.rs similarity index 84% rename from ops/src/bubblegum/rpc.rs rename to core/src/solana_rpc.rs index 0b4856401..71f86f8a6 100644 --- a/ops/src/bubblegum/rpc.rs +++ b/core/src/solana_rpc.rs @@ -82,6 +82,31 @@ impl Rpc { .await } + pub async fn get_account( + &self, + pubkey: &Pubkey, + ) -> Result< + solana_client::rpc_response::Response>, + ClientError, + > { + (|| async { + self.0 + .get_account_with_config( + pubkey, + RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + commitment: Some(CommitmentConfig { + commitment: CommitmentLevel::Finalized, + }), + ..RpcAccountInfoConfig::default() + }, + ) + .await + }) + .retry(&ExponentialBuilder::default()) + .await + } + pub async fn get_program_accounts( &self, program: &Pubkey, diff --git a/ops/src/account/account_details.rs b/ops/src/account/account_details.rs new file mode 100644 index 000000000..0490f1ca1 --- /dev/null +++ b/ops/src/account/account_details.rs @@ -0,0 +1,30 @@ +use anyhow::Result; +use das_core::Rpc; +use solana_sdk::{account::Account, pubkey::Pubkey}; + +pub struct AccountDetails<'a> { + pub account: Account, + pub slot: u64, + pub pubkey: &'a Pubkey, +} + +impl<'a> AccountDetails<'a> { + pub fn new(account: Account, slot: u64, pubkey: &'a Pubkey) -> Self { + Self { + account, + slot, + pubkey, + } + } + + pub async fn fetch(rpc: &Rpc, pubkey: &'a Pubkey) -> Result { + let account_response = rpc.get_account(pubkey).await?; + let slot = account_response.context.slot; + + let account = account_response + .value + .ok_or_else(|| anyhow::anyhow!("Account not found for pubkey: {}", pubkey))?; + + Ok(Self::new(account, slot, pubkey)) + } +} diff --git a/ops/src/account/cmd.rs b/ops/src/account/cmd.rs new file mode 100644 index 000000000..659758d1c --- /dev/null +++ b/ops/src/account/cmd.rs @@ -0,0 +1,32 @@ +use super::{program, single}; +use anyhow::Result; +use clap::{Args, Subcommand}; + +#[derive(Debug, Clone, Subcommand)] +pub enum Commands { + /// The 'program' command is used to backfill the index against on-chain accounts owned by a program. + #[clap(name = "program")] + Program(program::Args), + /// The 'single' command is used to backfill the index against a single account. + #[clap(name = "single")] + Single(single::Args), +} + +#[derive(Debug, Clone, Args)] +pub struct AccountCommand { + #[clap(subcommand)] + pub action: Commands, +} + +pub async fn subcommand(subcommand: AccountCommand) -> Result<()> { + match subcommand.action { + Commands::Program(args) => { + program::run(args).await?; + } + Commands::Single(args) => { + single::run(args).await?; + } + } + + Ok(()) +} diff --git a/ops/src/account/mod.rs b/ops/src/account/mod.rs new file mode 100644 index 000000000..6eeff799f --- /dev/null +++ b/ops/src/account/mod.rs @@ -0,0 +1,6 @@ +mod account_details; +mod cmd; +mod program; +mod single; + +pub use cmd::*; diff --git a/ops/src/account/program.rs b/ops/src/account/program.rs new file mode 100644 index 000000000..7dd990582 --- /dev/null +++ b/ops/src/account/program.rs @@ -0,0 +1,81 @@ +use anyhow::Result; + +use super::account_details::AccountDetails; +use clap::Parser; +use das_core::{MetricsArgs, QueueArgs, QueuePool, Rpc, SolanaRpcArgs}; +use flatbuffers::FlatBufferBuilder; +use plerkle_serialization::{ + serializer::serialize_account, solana_geyser_plugin_interface_shims::ReplicaAccountInfoV2, +}; +use solana_sdk::pubkey::Pubkey; + +#[derive(Debug, Parser, Clone)] +pub struct Args { + /// Redis configuration + #[clap(flatten)] + pub queue: QueueArgs, + + /// Metrics configuration + #[clap(flatten)] + pub metrics: MetricsArgs, + + /// Solana configuration + #[clap(flatten)] + pub solana: SolanaRpcArgs, + + /// The batch size to use when fetching accounts + #[arg(long, env, default_value = "1000")] + pub batch_size: usize, + + /// The public key of the program to backfill + #[clap(value_parser = parse_pubkey)] + pub program: Pubkey, +} + +fn parse_pubkey(s: &str) -> Result { + Pubkey::try_from(s).map_err(|_| "Failed to parse public key") +} + +pub async fn run(config: Args) -> Result<()> { + let rpc = Rpc::from_config(config.solana); + let queue = QueuePool::try_from_config(config.queue).await?; + + let accounts = rpc.get_program_accounts(&config.program, None).await?; + + let accounts_chunks = accounts.chunks(config.batch_size); + + for batch in accounts_chunks { + let results = futures::future::try_join_all( + batch + .iter() + .map(|(pubkey, _account)| AccountDetails::fetch(&rpc, pubkey)), + ) + .await?; + + for account_detail in results { + let AccountDetails { + account, + slot, + pubkey, + } = account_detail; + let builder = FlatBufferBuilder::new(); + let account_info = ReplicaAccountInfoV2 { + pubkey: &pubkey.to_bytes(), + lamports: account.lamports, + owner: &account.owner.to_bytes(), + executable: account.executable, + rent_epoch: account.rent_epoch, + data: &account.data, + write_version: 0, + txn_signature: None, + }; + + let fbb = serialize_account(builder, &account_info, slot, false); + let bytes = fbb.finished_data(); + + queue.push_account_backfill(bytes).await?; + } + } + + Ok(()) +} diff --git a/ops/src/account/single.rs b/ops/src/account/single.rs new file mode 100644 index 000000000..41269bb05 --- /dev/null +++ b/ops/src/account/single.rs @@ -0,0 +1,61 @@ +use anyhow::Result; + +use super::account_details::AccountDetails; +use clap::Parser; +use das_core::{MetricsArgs, QueueArgs, QueuePool, Rpc, SolanaRpcArgs}; +use flatbuffers::FlatBufferBuilder; +use plerkle_serialization::{ + serializer::serialize_account, solana_geyser_plugin_interface_shims::ReplicaAccountInfoV2, +}; +use solana_sdk::pubkey::Pubkey; + +#[derive(Debug, Parser, Clone)] +pub struct Args { + /// Redis configuration + #[clap(flatten)] + pub queue: QueueArgs, + + /// Metrics configuration + #[clap(flatten)] + pub metrics: MetricsArgs, + + /// Solana configuration + #[clap(flatten)] + pub solana: SolanaRpcArgs, + /// The public key of the account to backfill + #[clap(value_parser = parse_pubkey)] + pub account: Pubkey, +} + +fn parse_pubkey(s: &str) -> Result { + Pubkey::try_from(s).map_err(|_| "Failed to parse public key") +} + +pub async fn run(config: Args) -> Result<()> { + let rpc = Rpc::from_config(config.solana); + let queue = QueuePool::try_from_config(config.queue).await?; + + let AccountDetails { + account, + slot, + pubkey, + } = AccountDetails::fetch(&rpc, &config.account).await?; + let builder = FlatBufferBuilder::new(); + let account_info = ReplicaAccountInfoV2 { + pubkey: &pubkey.to_bytes(), + lamports: account.lamports, + owner: &account.owner.to_bytes(), + executable: account.executable, + rent_epoch: account.rent_epoch, + data: &account.data, + write_version: 0, + txn_signature: None, + }; + + let fbb = serialize_account(builder, &account_info, slot, false); + let bytes = fbb.finished_data(); + + queue.push_account_backfill(bytes).await?; + + Ok(()) +} diff --git a/ops/src/bubblegum/audit.rs b/ops/src/bubblegum/audit.rs index 671aceed0..4da0a2652 100644 --- a/ops/src/bubblegum/audit.rs +++ b/ops/src/bubblegum/audit.rs @@ -1,11 +1,8 @@ -use super::rpc::{Rpc, SolanaRpcArgs}; use anyhow::Result; - use clap::Parser; -use das_core::{connect_db, MetricsArgs, PoolArgs}; +use das_core::{connect_db, MetricsArgs, PoolArgs, Rpc, SolanaRpcArgs}; use digital_asset_types::dao::cl_audits_v2; use futures::future; - use sea_orm::{CursorTrait, EntityTrait, SqlxPostgresConnector}; use solana_sdk::signature::Signature; use solana_transaction_status::EncodedConfirmedTransactionWithStatusMeta; diff --git a/ops/src/bubblegum/backfiller.rs b/ops/src/bubblegum/backfiller.rs index d6d00405e..8b0e5ed4f 100644 --- a/ops/src/bubblegum/backfiller.rs +++ b/ops/src/bubblegum/backfiller.rs @@ -1,12 +1,10 @@ -use super::{ - queue::{QueueArgs, QueuePool}, - rpc::{Rpc, SolanaRpcArgs}, - tree::{TreeErrorKind, TreeGapFill, TreeGapModel, TreeResponse}, -}; +use super::tree::{TreeErrorKind, TreeGapFill, TreeGapModel, TreeResponse}; use anyhow::Result; use cadence_macros::{statsd_count, statsd_time}; use clap::Parser; -use das_core::{connect_db, setup_metrics, MetricsArgs, PoolArgs}; +use das_core::{ + connect_db, setup_metrics, MetricsArgs, PoolArgs, QueueArgs, QueuePool, Rpc, SolanaRpcArgs, +}; use digital_asset_types::dao::cl_audits_v2; use flatbuffers::FlatBufferBuilder; use futures::{stream::FuturesUnordered, StreamExt}; @@ -303,7 +301,9 @@ async fn queue_transaction<'a>( let message = seralize_encoded_transaction_with_status(FlatBufferBuilder::new(), transaction)?; - queue.push(message.finished_data()).await?; + queue + .push_transaction_backfill(message.finished_data()) + .await?; Ok(()) } diff --git a/ops/src/bubblegum/mod.rs b/ops/src/bubblegum/mod.rs index a25bc4784..eb4c867ad 100644 --- a/ops/src/bubblegum/mod.rs +++ b/ops/src/bubblegum/mod.rs @@ -1,8 +1,6 @@ mod audit; mod backfiller; mod cmd; -mod queue; -mod rpc; mod tree; pub use cmd::*; diff --git a/ops/src/bubblegum/queue.rs b/ops/src/bubblegum/queue.rs deleted file mode 100644 index 52105316f..000000000 --- a/ops/src/bubblegum/queue.rs +++ /dev/null @@ -1,103 +0,0 @@ -use anyhow::Result; -use clap::Parser; -use figment::value::{Dict, Value}; -use plerkle_messenger::{Messenger, MessengerConfig, MessengerType}; -use std::num::TryFromIntError; -use std::sync::Arc; -use tokio::sync::mpsc; -use tokio::sync::{mpsc::error::TrySendError, Mutex}; - -const TRANSACTION_BACKFILL_STREAM: &str = "TXNFILL"; - -#[derive(Clone, Debug, Parser)] -pub struct QueueArgs { - #[arg(long, env)] - pub messenger_redis_url: String, - #[arg(long, env, default_value = "100")] - pub messenger_redis_batch_size: String, - #[arg(long, env, default_value = "25")] - pub messenger_queue_connections: u64, - #[arg(long, env, default_value = "TXNFILL")] - pub messenger_queue_stream: String, -} - -impl From for MessengerConfig { - fn from(args: QueueArgs) -> Self { - let mut connection_config = Dict::new(); - - connection_config.insert( - "redis_connection_str".to_string(), - Value::from(args.messenger_redis_url), - ); - connection_config.insert( - "batch_size".to_string(), - Value::from(args.messenger_redis_batch_size), - ); - connection_config.insert( - "pipeline_size_bytes".to_string(), - Value::from(1u128.to_string()), - ); - - Self { - messenger_type: MessengerType::Redis, - connection_config, - } - } -} - -#[derive(thiserror::Error, Debug)] -pub enum QueuePoolError { - #[error("messenger")] - Messenger(#[from] plerkle_messenger::MessengerError), - #[error("tokio try send to channel")] - TrySendMessengerChannel(#[from] TrySendError>), - #[error("revc messenger connection")] - RecvMessengerConnection, - #[error("try from int")] - TryFromInt(#[from] TryFromIntError), - #[error("tokio send to channel")] - SendMessengerChannel(#[from] mpsc::error::SendError>), -} - -#[derive(Debug, Clone)] -pub struct QueuePool { - tx: mpsc::Sender>, - rx: Arc>>>, -} - -impl QueuePool { - pub async fn try_from_config(config: QueueArgs) -> anyhow::Result { - let size = usize::try_from(config.messenger_queue_connections)?; - let (tx, rx) = mpsc::channel(size); - - for _ in 0..config.messenger_queue_connections { - let messenger_config: MessengerConfig = config.clone().into(); - let mut messenger = plerkle_messenger::select_messenger(messenger_config).await?; - messenger.add_stream(TRANSACTION_BACKFILL_STREAM).await?; - messenger - .set_buffer_size(TRANSACTION_BACKFILL_STREAM, 10_000_000) - .await; - - tx.try_send(messenger)?; - } - - Ok(Self { - tx, - rx: Arc::new(Mutex::new(rx)), - }) - } - - pub async fn push(&self, message: &[u8]) -> Result<(), QueuePoolError> { - let mut rx = self.rx.lock().await; - let mut messenger = rx - .recv() - .await - .ok_or(QueuePoolError::RecvMessengerConnection)?; - - messenger.send(TRANSACTION_BACKFILL_STREAM, message).await?; - - self.tx.send(messenger).await?; - - Ok(()) - } -} diff --git a/ops/src/bubblegum/tree.rs b/ops/src/bubblegum/tree.rs index 260252907..09a3c92cb 100644 --- a/ops/src/bubblegum/tree.rs +++ b/ops/src/bubblegum/tree.rs @@ -1,6 +1,7 @@ use anyhow::Result; use borsh::BorshDeserialize; use clap::Args; +use das_core::{QueuePoolError, Rpc}; use log::error; use sea_orm::{DatabaseConnection, DbBackend, FromQueryResult, Statement, Value}; use solana_client::rpc_filter::{Memcmp, RpcFilterType}; @@ -14,8 +15,6 @@ use std::str::FromStr; use thiserror::Error as ThisError; use tokio::sync::mpsc::Sender; -use super::{queue::QueuePoolError, rpc::Rpc}; - const GET_SIGNATURES_FOR_ADDRESS_LIMIT: usize = 1000; #[derive(Debug, Clone, Args)] diff --git a/ops/src/main.rs b/ops/src/main.rs index 197b130ca..09e73be12 100644 --- a/ops/src/main.rs +++ b/ops/src/main.rs @@ -1,5 +1,7 @@ +mod account; mod bubblegum; +use account::{subcommand as account_subcommand, AccountCommand}; use anyhow::Result; use bubblegum::{subcommand as bubblegum_subcommand, BubblegumCommand}; use clap::{Parser, Subcommand}; @@ -15,6 +17,8 @@ struct Args { enum Command { #[clap(name = "bubblegum")] Bubblegum(BubblegumCommand), + #[clap(name = "account")] + Account(AccountCommand), } #[tokio::main] @@ -25,6 +29,7 @@ async fn main() -> Result<()> { match args.command { Command::Bubblegum(subcommand) => bubblegum_subcommand(subcommand).await?, + Command::Account(subcommand) => account_subcommand(subcommand).await?, } Ok(())