Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DAS OPS] Use ProgramTransformer in account backfill commands #143

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions ops/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@ das-core = { workspace = true }
digital_asset_types = { workspace = true }
env_logger = { workspace = true }
figment = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true }
indicatif = { workspace = true }
log = { workspace = true }
mpl-bubblegum = { workspace = true }
plerkle_messenger = { workspace = true }
plerkle_serialization = { workspace = true }
program_transformers = { workspace = true }
sea-orm = { workspace = true }
solana-account-decoder = { workspace = true }
Expand Down
30 changes: 0 additions & 30 deletions ops/src/account/account_details.rs

This file was deleted.

28 changes: 28 additions & 0 deletions ops/src/account/account_info.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use anyhow::Result;
use das_core::Rpc;
use program_transformers::AccountInfo;
use solana_sdk::pubkey::Pubkey;

#[derive(thiserror::Error, Debug)]
pub enum AccountInfoError {
#[error("account not found for pubkey: {pubkey}")]
NotFound { pubkey: Pubkey },
#[error("failed to fetch account info")]
SolanaRequestError(#[from] solana_client::client_error::ClientError),
}

pub async fn fetch(rpc: &Rpc, pubkey: Pubkey) -> Result<AccountInfo, AccountInfoError> {
let account_response = rpc.get_account(&pubkey).await?;
let slot = account_response.context.slot;

let account = account_response
.value
.ok_or_else(|| AccountInfoError::NotFound { pubkey })?;

Ok(AccountInfo {
slot,
pubkey,
owner: account.owner,
data: account.data,
})
}
2 changes: 1 addition & 1 deletion ops/src/account/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod account_details;
mod account_info;
mod cmd;
mod program;
mod single;
Expand Down
99 changes: 62 additions & 37 deletions ops/src/account/program.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
use super::account_info;
use anyhow::Result;

use super::account_details::AccountDetails;
use clap::Parser;
use das_core::{MetricsArgs, QueueArgs, QueuePool, Rpc, SolanaRpcArgs};
use flatbuffers::FlatBufferBuilder;
use plerkle_serialization::{
serializer::serialize_account, solana_geyser_plugin_interface_shims::ReplicaAccountInfoV2,
};
use das_core::{connect_db, MetricsArgs, PoolArgs, Rpc, SolanaRpcArgs};
use futures::future::{ready, FutureExt};
use futures::{stream::FuturesUnordered, StreamExt};
use log::error;
use program_transformers::{AccountInfo, ProgramTransformer};
use solana_sdk::pubkey::Pubkey;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task;

#[derive(Debug, Parser, Clone)]
pub struct Args {
/// Redis configuration
#[clap(flatten)]
pub queue: QueueArgs,

/// Metrics configuration
#[clap(flatten)]
pub metrics: MetricsArgs,
Expand All @@ -30,6 +28,18 @@ pub struct Args {
/// The public key of the program to backfill
#[clap(value_parser = parse_pubkey)]
pub program: Pubkey,

/// The maximum buffer size for accounts
#[arg(long, env, default_value = "10000")]
pub max_buffer_size: usize,

/// The number of worker threads
#[arg(long, env, default_value = "1000")]
pub account_worker_count: usize,

/// Database configuration
#[clap(flatten)]
pub database: PoolArgs,
}

fn parse_pubkey(s: &str) -> Result<Pubkey, &'static str> {
Expand All @@ -38,44 +48,59 @@ fn parse_pubkey(s: &str) -> Result<Pubkey, &'static str> {

pub async fn run(config: Args) -> Result<()> {
let rpc = Rpc::from_config(&config.solana);
let queue = QueuePool::try_from_config(&config.queue).await?;
let pool = connect_db(&config.database).await?;
let num_workers = config.account_worker_count;

let accounts = rpc.get_program_accounts(&config.program, None).await?;
let (tx, mut rx) = mpsc::channel::<Vec<AccountInfo>>(config.max_buffer_size);

let mut workers = FuturesUnordered::new();
let program_transformer = Arc::new(ProgramTransformer::new(
pool,
Box::new(|_info| ready(Ok(())).boxed()),
false,
));

let account_info_worker_manager = tokio::spawn(async move {
while let Some(account_infos) = rx.recv().await {
if workers.len() >= num_workers {
workers.next().await;
}

for account_info in account_infos {
let program_transformer = Arc::clone(&program_transformer);

let worker = task::spawn(async move {
if let Err(e) = program_transformer
.handle_account_update(&account_info)
.await
{
error!("Failed to handle account update: {:?}", e);
}
});

workers.push(worker);
}
}

while (workers.next().await).is_some() {}
});

let accounts = rpc.get_program_accounts(&config.program, None).await?;
let accounts_chunks = accounts.chunks(config.batch_size);

for batch in accounts_chunks {
let results = futures::future::try_join_all(
batch
.iter()
.map(|(pubkey, _account)| AccountDetails::fetch(&rpc, pubkey)),
.cloned()
.map(|(pubkey, _account)| account_info::fetch(&rpc, pubkey)),
)
.await?;

for account_detail in results {
let AccountDetails {
account,
slot,
pubkey,
} = account_detail;
let builder = FlatBufferBuilder::new();
let account_info = ReplicaAccountInfoV2 {
pubkey: &pubkey.to_bytes(),
lamports: account.lamports,
owner: &account.owner.to_bytes(),
executable: account.executable,
rent_epoch: account.rent_epoch,
data: &account.data,
write_version: 0,
txn_signature: None,
};

let fbb = serialize_account(builder, &account_info, slot, false);
let bytes = fbb.finished_data();

queue.push_account_backfill(bytes).await?;
}
tx.send(results).await?;
}

account_info_worker_manager.await?;

Ok(())
}
47 changes: 16 additions & 31 deletions ops/src/account/single.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use anyhow::Result;

use super::account_details::AccountDetails;
use super::account_info;
use clap::Parser;
use das_core::{MetricsArgs, QueueArgs, QueuePool, Rpc, SolanaRpcArgs};
use flatbuffers::FlatBufferBuilder;
use plerkle_serialization::{
serializer::serialize_account, solana_geyser_plugin_interface_shims::ReplicaAccountInfoV2,
};
use das_core::{connect_db, MetricsArgs, PoolArgs, Rpc, SolanaRpcArgs};
use futures::future::{ready, FutureExt};
use program_transformers::ProgramTransformer;
use solana_sdk::pubkey::Pubkey;

#[derive(Debug, Parser, Clone)]
pub struct Args {
/// Redis configuration
/// Database configuration
#[clap(flatten)]
pub queue: QueueArgs,
pub database: PoolArgs,

/// Metrics configuration
#[clap(flatten)]
Expand All @@ -33,29 +31,16 @@ fn parse_pubkey(s: &str) -> Result<Pubkey, &'static str> {

pub async fn run(config: Args) -> Result<()> {
let rpc = Rpc::from_config(&config.solana);
let queue = QueuePool::try_from_config(&config.queue).await?;

let AccountDetails {
account,
slot,
pubkey,
} = AccountDetails::fetch(&rpc, &config.account).await?;
let builder = FlatBufferBuilder::new();
let account_info = ReplicaAccountInfoV2 {
pubkey: &pubkey.to_bytes(),
lamports: account.lamports,
owner: &account.owner.to_bytes(),
executable: account.executable,
rent_epoch: account.rent_epoch,
data: &account.data,
write_version: 0,
txn_signature: None,
};

let fbb = serialize_account(builder, &account_info, slot, false);
let bytes = fbb.finished_data();

queue.push_account_backfill(bytes).await?;
let pool = connect_db(&config.database).await?;

let program_transformer =
ProgramTransformer::new(pool, Box::new(|_info| ready(Ok(())).boxed()), false);

let account_info = account_info::fetch(&rpc, config.account).await?;

program_transformer
.handle_account_update(&account_info)
.await?;

Ok(())
}
17 changes: 1 addition & 16 deletions ops/src/bubblegum/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,10 @@ Command line arguments can also be set through environment variables.

### Backfill

The `backfill` command initiates the crawling and backfilling process. It requires the Solana RPC URL, the database URL, and the messenger Redis URL.
The `backfill` command initiates the crawling and backfilling process. It requires the Solana RPC URL, the database URL.

**warning**: The command expects full archive access to transactions. Before proceeding ensure your RPC is able to serve complete transaction history for Solana.

```mermaid
flowchart
start((Start)) -->init[Initialize RPC, DB]
init --> fetchTreesDB[Fetch Trees from DB]
fetchTreesDB --> findGapsDB[Find Gaps in DB]
findGapsDB --> enqueueGapFills[Enqueue Gap Fills]
enqueueGapFills --> gapWorkerManager[Gap Worker Manager]
gapWorkerManager --> crawlSignatures[Crawl Solana RPC]
crawlSignatures --> enqueueSignatures[Enqueue Signatures]
enqueueSignatures --> transactionWorkerManager[Transaction Worker Manager]
transactionWorkerManager --> fetchTransactionsRPC[Fetch Transactions RPC]
fetchTransactionsRPC --> processTransactions[Push Transaction to Messenger]
processTransactions ---> Finished
```

```
Usage: das-ops bubblegum backfill [OPTIONS] --database-url <DATABASE_URL> --messenger-redis-url <MESSENGER_REDIS_URL> --solana-rpc-url <SOLANA_RPC_URL>

Expand Down
Loading