Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account Backfill By Program #139

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,17 @@ publish.workspace = true

[dependencies]
anyhow = { workspace = true }
backon = { workspace = true }
solana-account-decoder = { workspace = true }
solana-client = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
cadence = { workspace = true }
cadence-macros = { workspace = true }
thiserror = { workspace = true }
figment = { workspace = true }
plerkle_messenger = { workspace = true }
tokio = { workspace = true }
clap = { workspace = true, features = ["derive", "cargo", "env"] }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }

Expand Down
4 changes: 4 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
mod db;
mod metrics;
mod plerkle_messenger_queue;
mod solana_rpc;

pub use db::*;
pub use metrics::*;
pub use plerkle_messenger_queue::*;
pub use solana_rpc::*;
180 changes: 180 additions & 0 deletions core/src/plerkle_messenger_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
use anyhow::Result;
use clap::Parser;
use figment::value::{Dict, Value};
use plerkle_messenger::{
Messenger, MessengerConfig, MessengerType, ACCOUNT_BACKFILL_STREAM, ACCOUNT_STREAM,
TRANSACTION_BACKFILL_STREAM, TRANSACTION_STREAM,
};
use std::num::TryFromIntError;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::{mpsc::error::TrySendError, Mutex};

#[derive(Clone, Debug, Parser)]
pub struct QueueArgs {
#[arg(long, env)]
pub messenger_redis_url: String,
#[arg(long, env, default_value = "100")]
pub messenger_redis_batch_size: String,
#[arg(long, env, default_value = "25")]
pub messenger_queue_connections: u64,
}

impl From<QueueArgs> for MessengerConfig {
fn from(args: QueueArgs) -> Self {
let mut connection_config = Dict::new();

connection_config.insert(
"redis_connection_str".to_string(),
Value::from(args.messenger_redis_url),
);
connection_config.insert(
"batch_size".to_string(),
Value::from(args.messenger_redis_batch_size),
);
connection_config.insert(
"pipeline_size_bytes".to_string(),
Value::from(1u128.to_string()),
);

Self {
messenger_type: MessengerType::Redis,
connection_config,
}
}
}

#[derive(thiserror::Error, Debug)]
pub enum QueuePoolError {
#[error("messenger")]
Messenger(#[from] plerkle_messenger::MessengerError),
#[error("tokio try send to channel")]
TrySendMessengerChannel(#[from] TrySendError<Box<dyn Messenger>>),
#[error("revc messenger connection")]
RecvMessengerConnection,
#[error("try from int")]
TryFromInt(#[from] TryFromIntError),
#[error("tokio send to channel")]
SendMessengerChannel(#[from] mpsc::error::SendError<Box<dyn Messenger>>),
}

#[derive(Debug, Clone)]
pub struct QueuePool {
tx: mpsc::Sender<Box<dyn plerkle_messenger::Messenger>>,
rx: Arc<Mutex<mpsc::Receiver<Box<dyn plerkle_messenger::Messenger>>>>,
}

impl QueuePool {
pub async fn try_from_config(config: QueueArgs) -> anyhow::Result<Self, QueuePoolError> {
let size = usize::try_from(config.messenger_queue_connections)?;
let (tx, rx) = mpsc::channel(size);

for _ in 0..config.messenger_queue_connections {
let messenger_config: MessengerConfig = config.clone().into();
let mut messenger = plerkle_messenger::select_messenger(messenger_config).await?;

let streams = [
(plerkle_messenger::ACCOUNT_STREAM, 100_000_000),
(plerkle_messenger::ACCOUNT_BACKFILL_STREAM, 100_000_000),
(plerkle_messenger::SLOT_STREAM, 100_000),
(plerkle_messenger::TRANSACTION_STREAM, 10_000_000),
(plerkle_messenger::TRANSACTION_BACKFILL_STREAM, 10_000_000),
(plerkle_messenger::BLOCK_STREAM, 100_000),
];

for &(key, size) in &streams {
messenger.add_stream(key).await?;
messenger.set_buffer_size(key, size).await;
}

tx.try_send(messenger)?;
}

Ok(Self {
tx,
rx: Arc::new(Mutex::new(rx)),
})
}

/// Pushes account backfill data to the appropriate stream.
///
/// This method sends account backfill data to the `ACCOUNT_BACKFILL_STREAM`.
/// It is used for backfilling account information in the system.
///
/// # Arguments
///
/// * `bytes` - A byte slice containing the account backfill data to be pushed.
///
/// # Returns
///
/// This method returns a `Result` which is `Ok` if the push is successful,
/// or an `Err` with a `QueuePoolError` if the push fails.
pub async fn push_account_backfill(&self, bytes: &[u8]) -> Result<(), QueuePoolError> {
self.push(ACCOUNT_BACKFILL_STREAM, bytes).await
}

/// Pushes transaction backfill data to the appropriate stream.
///
/// This method sends transaction backfill data to the `TRANSACTION_BACKFILL_STREAM`.
/// It is used for backfilling transaction information in the system.
///
/// # Arguments
///
/// * `bytes` - A byte slice containing the transaction backfill data to be pushed.
///
/// # Returns
///
/// This method returns a `Result` which is `Ok` if the push is successful,
/// or an `Err` with a `QueuePoolError` if the push fails.
pub async fn push_transaction_backfill(&self, bytes: &[u8]) -> Result<(), QueuePoolError> {
self.push(TRANSACTION_BACKFILL_STREAM, bytes).await
}

/// Pushes account data to the appropriate stream.
///
/// This method sends account data to the `ACCOUNT_STREAM`.
/// It is used for pushing real-time account updates to the system.
///
/// # Arguments
///
/// * `bytes` - A byte slice containing the account data to be pushed.
///
/// # Returns
///
/// This method returns a `Result` which is `Ok` if the push is successful,
/// or an `Err` with a `QueuePoolError` if the push fails.
pub async fn push_account(&self, bytes: &[u8]) -> Result<(), QueuePoolError> {
self.push(ACCOUNT_STREAM, bytes).await
}

/// Pushes transaction data to the appropriate stream.
///
/// This method sends transaction data to the `TRANSACTION_STREAM`.
/// It is used for pushing real-time transaction updates to the system.
///
/// # Arguments
///
/// * `bytes` - A byte slice containing the transaction data to be pushed.
///
/// # Returns
///
/// This method returns a `Result` which is `Ok` if the push is successful,
/// or an `Err` with a `QueuePoolError` if the push fails.
pub async fn push_transaction(&self, bytes: &[u8]) -> Result<(), QueuePoolError> {
self.push(TRANSACTION_STREAM, bytes).await
}

async fn push(&self, stream_key: &'static str, bytes: &[u8]) -> Result<(), QueuePoolError> {
let mut rx = self.rx.lock().await;
let mut messenger = rx
.recv()
.await
.ok_or(QueuePoolError::RecvMessengerConnection)?;

messenger.send(stream_key, bytes).await?;

self.tx.send(messenger).await?;

Ok(())
}
}
25 changes: 25 additions & 0 deletions ops/src/bubblegum/rpc.rs → core/src/solana_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ impl Rpc {
.await
}

pub async fn get_account(
&self,
pubkey: &Pubkey,
) -> Result<
solana_client::rpc_response::Response<std::option::Option<solana_sdk::account::Account>>,
ClientError,
> {
(|| async {
self.0
.get_account_with_config(
pubkey,
RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig {
commitment: CommitmentLevel::Finalized,
}),
..RpcAccountInfoConfig::default()
},
)
.await
})
.retry(&ExponentialBuilder::default())
.await
}

pub async fn get_program_accounts(
&self,
program: &Pubkey,
Expand Down
30 changes: 30 additions & 0 deletions ops/src/account/account_details.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use anyhow::Result;
use das_core::Rpc;
use solana_sdk::{account::Account, pubkey::Pubkey};

pub struct AccountDetails<'a> {
pub account: Account,
pub slot: u64,
pub pubkey: &'a Pubkey,
}

impl<'a> AccountDetails<'a> {
pub fn new(account: Account, slot: u64, pubkey: &'a Pubkey) -> Self {
Self {
account,
slot,
pubkey,
}
}

pub async fn fetch(rpc: &Rpc, pubkey: &'a Pubkey) -> Result<Self> {
let account_response = rpc.get_account(pubkey).await?;
let slot = account_response.context.slot;

let account = account_response
.value
.ok_or_else(|| anyhow::anyhow!("Account not found for pubkey: {}", pubkey))?;

Ok(Self::new(account, slot, pubkey))
}
}
32 changes: 32 additions & 0 deletions ops/src/account/cmd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use super::{program, single};
use anyhow::Result;
use clap::{Args, Subcommand};

#[derive(Debug, Clone, Subcommand)]
pub enum Commands {
/// The 'program' command is used to backfill the index against on-chain accounts owned by a program.
#[clap(name = "program")]
Program(program::Args),
/// The 'single' command is used to backfill the index against a single account.
#[clap(name = "single")]
Single(single::Args),
}

#[derive(Debug, Clone, Args)]
pub struct AccountCommand {
#[clap(subcommand)]
pub action: Commands,
}

pub async fn subcommand(subcommand: AccountCommand) -> Result<()> {
match subcommand.action {
Commands::Program(args) => {
program::run(args).await?;
}
Commands::Single(args) => {
single::run(args).await?;
}
}

Ok(())
}
6 changes: 6 additions & 0 deletions ops/src/account/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod account_details;
mod cmd;
mod program;
mod single;

pub use cmd::*;
Loading