Skip to content

Commit

Permalink
refactor: use program transform in the account backfill no queuing in…
Browse files Browse the repository at this point in the history
…to redis.
  • Loading branch information
kespinola committed Jun 11, 2024
1 parent 997ec83 commit 44f4a97
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 121 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions ops/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ das-core = { workspace = true }
digital_asset_types = { workspace = true }
env_logger = { workspace = true }
figment = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true }
indicatif = { workspace = true }
log = { workspace = true }
mpl-bubblegum = { workspace = true }
plerkle_messenger = { workspace = true }
plerkle_serialization = { workspace = true }
program_transformers = { workspace = true }
sea-orm = { workspace = true }
solana-account-decoder = { workspace = true }
Expand Down
30 changes: 0 additions & 30 deletions ops/src/account/account_details.rs

This file was deleted.

28 changes: 28 additions & 0 deletions ops/src/account/account_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use anyhow::Result;
use das_core::Rpc;
use program_transformers::AccountInfo;
use solana_sdk::pubkey::Pubkey;

#[derive(thiserror::Error, Debug)]
pub enum AccountInfoError {
#[error("account not found for pubkey: {pubkey}")]
NotFound { pubkey: Pubkey },
#[error("failed to fetch account info")]
SolanaRequestError(#[from] solana_client::client_error::ClientError),
}

pub async fn fetch(rpc: &Rpc, pubkey: Pubkey) -> Result<AccountInfo, AccountInfoError> {
let account_response = rpc.get_account(&pubkey).await?;
let slot = account_response.context.slot;

let account = account_response
.value
.ok_or_else(|| AccountInfoError::NotFound { pubkey })?;

Ok(AccountInfo {
slot,
pubkey,
owner: account.owner,
data: account.data,
})
}
2 changes: 1 addition & 1 deletion ops/src/account/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod account_details;
mod account_info;
mod cmd;
mod program;
mod single;
Expand Down
99 changes: 62 additions & 37 deletions ops/src/account/program.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
use super::account_info;
use anyhow::Result;

use super::account_details::AccountDetails;
use clap::Parser;
use das_core::{MetricsArgs, QueueArgs, QueuePool, Rpc, SolanaRpcArgs};
use flatbuffers::FlatBufferBuilder;
use plerkle_serialization::{
serializer::serialize_account, solana_geyser_plugin_interface_shims::ReplicaAccountInfoV2,
};
use das_core::{connect_db, MetricsArgs, PoolArgs, Rpc, SolanaRpcArgs};
use futures::future::{ready, FutureExt};
use futures::{stream::FuturesUnordered, StreamExt};
use log::error;
use program_transformers::{AccountInfo, ProgramTransformer};
use solana_sdk::pubkey::Pubkey;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task;

#[derive(Debug, Parser, Clone)]
pub struct Args {
/// Redis configuration
#[clap(flatten)]
pub queue: QueueArgs,

/// Metrics configuration
#[clap(flatten)]
pub metrics: MetricsArgs,
Expand All @@ -30,6 +28,18 @@ pub struct Args {
/// The public key of the program to backfill
#[clap(value_parser = parse_pubkey)]
pub program: Pubkey,

/// The maximum buffer size for accounts
#[arg(long, env, default_value = "10000")]
pub max_buffer_size: usize,

/// The number of worker threads
#[arg(long, env, default_value = "1000")]
pub account_worker_count: usize,

/// Database configuration
#[clap(flatten)]
pub database: PoolArgs,
}

fn parse_pubkey(s: &str) -> Result<Pubkey, &'static str> {
Expand All @@ -38,44 +48,59 @@ fn parse_pubkey(s: &str) -> Result<Pubkey, &'static str> {

pub async fn run(config: Args) -> Result<()> {
let rpc = Rpc::from_config(&config.solana);
let queue = QueuePool::try_from_config(&config.queue).await?;
let pool = connect_db(&config.database).await?;
let num_workers = config.account_worker_count;

let accounts = rpc.get_program_accounts(&config.program, None).await?;
let (tx, mut rx) = mpsc::channel::<Vec<AccountInfo>>(config.max_buffer_size);

let mut workers = FuturesUnordered::new();
let program_transformer = Arc::new(ProgramTransformer::new(
pool,
Box::new(|_info| ready(Ok(())).boxed()),
false,
));

let account_info_worker_manager = tokio::spawn(async move {
while let Some(account_infos) = rx.recv().await {
if workers.len() >= num_workers {
workers.next().await;
}

for account_info in account_infos {
let program_transformer = Arc::clone(&program_transformer);

let worker = task::spawn(async move {
if let Err(e) = program_transformer
.handle_account_update(&account_info)
.await
{
error!("Failed to handle account update: {:?}", e);
}
});

workers.push(worker);
}
}

while (workers.next().await).is_some() {}
});

let accounts = rpc.get_program_accounts(&config.program, None).await?;
let accounts_chunks = accounts.chunks(config.batch_size);

for batch in accounts_chunks {
let results = futures::future::try_join_all(
batch
.iter()
.map(|(pubkey, _account)| AccountDetails::fetch(&rpc, pubkey)),
.cloned()
.map(|(pubkey, _account)| account_info::fetch(&rpc, pubkey)),
)
.await?;

for account_detail in results {
let AccountDetails {
account,
slot,
pubkey,
} = account_detail;
let builder = FlatBufferBuilder::new();
let account_info = ReplicaAccountInfoV2 {
pubkey: &pubkey.to_bytes(),
lamports: account.lamports,
owner: &account.owner.to_bytes(),
executable: account.executable,
rent_epoch: account.rent_epoch,
data: &account.data,
write_version: 0,
txn_signature: None,
};

let fbb = serialize_account(builder, &account_info, slot, false);
let bytes = fbb.finished_data();

queue.push_account_backfill(bytes).await?;
}
tx.send(results).await?;
}

account_info_worker_manager.await?;

Ok(())
}
47 changes: 16 additions & 31 deletions ops/src/account/single.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use anyhow::Result;

use super::account_details::AccountDetails;
use super::account_info;
use clap::Parser;
use das_core::{MetricsArgs, QueueArgs, QueuePool, Rpc, SolanaRpcArgs};
use flatbuffers::FlatBufferBuilder;
use plerkle_serialization::{
serializer::serialize_account, solana_geyser_plugin_interface_shims::ReplicaAccountInfoV2,
};
use das_core::{connect_db, MetricsArgs, PoolArgs, Rpc, SolanaRpcArgs};
use futures::future::{ready, FutureExt};
use program_transformers::ProgramTransformer;
use solana_sdk::pubkey::Pubkey;

#[derive(Debug, Parser, Clone)]
pub struct Args {
/// Redis configuration
/// Database configuration
#[clap(flatten)]
pub queue: QueueArgs,
pub database: PoolArgs,

/// Metrics configuration
#[clap(flatten)]
Expand All @@ -33,29 +31,16 @@ fn parse_pubkey(s: &str) -> Result<Pubkey, &'static str> {

pub async fn run(config: Args) -> Result<()> {
let rpc = Rpc::from_config(&config.solana);
let queue = QueuePool::try_from_config(&config.queue).await?;

let AccountDetails {
account,
slot,
pubkey,
} = AccountDetails::fetch(&rpc, &config.account).await?;
let builder = FlatBufferBuilder::new();
let account_info = ReplicaAccountInfoV2 {
pubkey: &pubkey.to_bytes(),
lamports: account.lamports,
owner: &account.owner.to_bytes(),
executable: account.executable,
rent_epoch: account.rent_epoch,
data: &account.data,
write_version: 0,
txn_signature: None,
};

let fbb = serialize_account(builder, &account_info, slot, false);
let bytes = fbb.finished_data();

queue.push_account_backfill(bytes).await?;
let pool = connect_db(&config.database).await?;

let program_transformer =
ProgramTransformer::new(pool, Box::new(|_info| ready(Ok(())).boxed()), false);

let account_info = account_info::fetch(&rpc, config.account).await?;

program_transformer
.handle_account_update(&account_info)
.await?;

Ok(())
}
17 changes: 1 addition & 16 deletions ops/src/bubblegum/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,10 @@ Command line arguments can also be set through environment variables.

### Backfill

The `backfill` command initiates the crawling and backfilling process. It requires the Solana RPC URL, the database URL, and the messenger Redis URL.
The `backfill` command initiates the crawling and backfilling process. It requires the Solana RPC URL, the database URL.

**warning**: The command expects full archive access to transactions. Before proceeding ensure your RPC is able to serve complete transaction history for Solana.

```mermaid
flowchart
start((Start)) -->init[Initialize RPC, DB]
init --> fetchTreesDB[Fetch Trees from DB]
fetchTreesDB --> findGapsDB[Find Gaps in DB]
findGapsDB --> enqueueGapFills[Enqueue Gap Fills]
enqueueGapFills --> gapWorkerManager[Gap Worker Manager]
gapWorkerManager --> crawlSignatures[Crawl Solana RPC]
crawlSignatures --> enqueueSignatures[Enqueue Signatures]
enqueueSignatures --> transactionWorkerManager[Transaction Worker Manager]
transactionWorkerManager --> fetchTransactionsRPC[Fetch Transactions RPC]
fetchTransactionsRPC --> processTransactions[Push Transaction to Messenger]
processTransactions ---> Finished
```

```
Usage: das-ops bubblegum backfill [OPTIONS] --database-url <DATABASE_URL> --messenger-redis-url <MESSENGER_REDIS_URL> --solana-rpc-url <SOLANA_RPC_URL>
Expand Down

0 comments on commit 44f4a97

Please sign in to comment.