Skip to content

Commit

Permalink
refactor: fixes #4
Browse files Browse the repository at this point in the history
Signed-off-by: Wilfred Almeida <[email protected]>
  • Loading branch information
WilfredAlmeida committed May 5, 2024
1 parent d763c4c commit bfcd1a2
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 47 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
.env
.vscode
.idea
migration/
migration/
.log
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ Follow the steps mentioned below
### Testing
If the program is running without any errors then the database is populated with information on new NFT mints. You can query the RPC API locally. It runs on the default URL `http://localhost:9090/`

### Note
1. Asynchronous backfilling is broken and the logic needs to be redone
2. Backfilling is disabled by default, enable it by uncommenting the block at `src/main.rs:148`

### Support
If you need any help, have any thoughts, or need to get in touch, DM [Wilfred](https://twitter.com/WilfredAlmeida_) on Twitter/X or open an issue.
Expand Down
73 changes: 63 additions & 10 deletions src/backfill/backfill.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,81 @@
use sea_orm::{ConnectionTrait, DatabaseConnection, Statement};
use solana_rpc_client_api::response::RpcConfirmedTransactionStatusWithSignature;

use crate::{
config::transaction_queue::{push_front, TransactionsQueue},
rpc::rpc::{get_signatures_for_tree, get_transaction_with_retries},
rpc::rpc::get_signatures_for_tree,
};

use crate::processor::transaction::process_transaction;
pub async fn backfill_tree(tree_address: String, db_connection: DatabaseConnection) {
let mut last_processed_tx: Option<String> = None;
let mut until_signature: Option<String> = None;
let mut genesis_backfill_completed: bool = false;

let query = "SELECT last_processed_signature,genesis_backfill_completed FROM ld_merkle_trees WHERE address=$1;";

let query_res = db_connection
.query_one(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
query,
vec![sea_orm::Value::from(tree_address.as_str())],
))
.await
.unwrap();

match query_res {
Some(row) => {
let sign: Option<String> = row.try_get("", "last_processed_signature").unwrap_or(None);
println!("last_processed_signature: {:?}", sign);
last_processed_tx = sign;

let gbc: bool = row
.try_get("", "genesis_backfill_completed")
.unwrap_or(false);

genesis_backfill_completed = gbc;
}
None => {
println!(
"No last_processed_signature found for tree: {}",
tree_address
);
}
}

if genesis_backfill_completed {
until_signature = last_processed_tx.clone();
last_processed_tx.take();
}

println!(
"genesis_backfill_completed: {:?}",
genesis_backfill_completed
);
println!("until_signature: {:?}", until_signature);
println!("last processed: {:?}", last_processed_tx);

pub async fn backfill_tree(tree_address: String) {
// TODO: Get this from database
let mut last_processed_tx: Option<&String> = None;
let mut signatures: Vec<RpcConfirmedTransactionStatusWithSignature>;
// let transactions_queue = get_queue();

loop {
signatures = get_signatures_for_tree(&tree_address, last_processed_tx).await;
signatures = get_signatures_for_tree(
&tree_address,
last_processed_tx.as_ref(),
until_signature.as_ref(),
)
.await;

last_processed_tx = Some(&signatures[0].signature);
if signatures.len() == 0 {
break;
}

for signature in &signatures {
println!("backfill tx");
last_processed_tx = Some(signatures[signatures.len() - 1].signature.clone());
println!("last_processed_tx: {:?}", last_processed_tx);

for signature in &signatures {
println!("backfill tx {:?}", signature.signature);
push_front(TransactionsQueue {
transaction_signature: signature.signature.clone(),
tree_address: tree_address.clone().into(),
})
}

Expand Down
3 changes: 3 additions & 0 deletions src/config/transaction_queue.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use lazy_static::lazy_static;
use std::sync::Mutex;
use std::{borrow::BorrowMut, collections::VecDeque, sync::OnceLock};

#[derive(Clone, Debug)]
pub struct TransactionsQueue {
pub transaction_signature: String,
pub tree_address: Option<String>,
}

lazy_static! {
Expand Down
140 changes: 116 additions & 24 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::iter;
use std::pin::Pin;
use std::thread::sleep;
use std::str::{Bytes, FromStr};
// use std::thread::sleep;
use std::time::Duration;

use crate::config::database::setup_database_config;
Expand All @@ -16,12 +18,14 @@ use mpl_bubblegum::accounts::MerkleTree;
use processor::logs::process_logs;
use processor::metadata::fetch_store_metadata;
use processor::queue_processor::process_transactions_queue;
use sea_orm::SqlxPostgresConnector;
use sea_orm::{ConnectionTrait, Database, DatabaseConnection, SqlxPostgresConnector, Statement};
use solana_client::rpc_config::{RpcTransactionLogsConfig, RpcTransactionLogsFilter};
use solana_client::rpc_response::{Response, RpcLogsResponse};
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use sqlx::{Acquire, PgPool};
use tokio::task;
use tokio::time::sleep;

mod backfill;
mod config;
Expand All @@ -38,24 +42,84 @@ async fn main() -> Result<()> {

let database_pool = setup_database_config(&env_config).await;

let db_connection = SqlxPostgresConnector::from_sqlx_postgres_pool(database_pool.clone());

// Refer https://github.com/WilfredAlmeida/LightDAS/issues/4 to understand why this is needed
let _ = db_connection
.execute(Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
String::from(
"CREATE TABLE IF NOT EXISTS LD_MERKLE_TREES (
ADDRESS VARCHAR(255),
TAG VARCHAR(255) NULL,
CAPACITY INT NULL,
MAX_DEPTH INT NULL,
CANOPY_DEPTH INT NULL,
MAX_BUFFER_SIZE INT NULL,
SHOULD_INDEX BOOLEAN DEFAULT TRUE,
GENESIS_BACKFILL_COMPLETED BOOLEAN DEFAULT FALSE,
LAST_PROCESSED_SIGNATURE VARCHAR(255) NULL,
CREATED_AT TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UPDATED_AT TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);",
),
))
.await?;

let pubsub_client = get_pubsub_client();

let tree_addresses: Vec<String> = vec![
// "GXTXbFwcbNdWbiCWzZc3J2XGofopnhN9T98jnG29D2Yw".to_string(),
// "Aju7YfPdhjaqJbRdow48PqxcWutDDHWww6eoDC9PVY7m".to_string(),
// "43XAHmPkq8Yth3swdqrh5aZvWrmuci5ZhPVLptreaUZ1".to_string(),
// "EQQiiEceUo2uxHQgtRt8W92frLXwMUwdvt7P9Yo26cUM".to_string(),
// "CkSa2n2eyJvsPLA7ufVos94NAUTYuVhaxrvH2GS69f9j".to_string()
// "Dbx2uKULg44XeBR28tNWu2dU4bPpGfuYrd7RntgGXvuT".to_string(),
// "CkSa2n2eyJvsPLA7ufVos94NAUTYuVhaxrvH2GS69f9j".to_string(),
// "EBFsHQKYCn1obUr2FVNvGTkaUYf2p5jao2MVdbK5UNRH".to_string(),
// "14b9wzhVSaiUHB4t8tDY9QYNsGStT8ycaoLkBHZLZwax".to_string(),
// "6kAoPaZV4aB1rMPTPkbgycb9iNbHHibSzjhAvWEroMm".to_string(),
// "FmUjM4YBLK93WSb7AnbuYZy1h2kCcjZM8kHsi9ZU93TP".to_string(),
// "6JTnMcq9a6atrqmsz4rgTWp9EG5YPzxoobD7vg1csNt5".to_string(),
// "HVGMVJ7DyfXLU2H5AJSHvX2HkFrRrHQAoXAHfYUmicYr".to_string(),
"D8yRakvsjWSR3ihANhwjP8RmNLg3A46EA1V1EbMLDT8B".to_string(),
];
let res = db_connection
.query_all(Statement::from_string(
sea_orm::DatabaseBackend::Postgres,
String::from("SELECT * FROM ld_merkle_trees;"),
))
.await;

let mut tree_addresses: Vec<String> = Vec::new();
match res {
Ok(rows) => {
if rows.len() == 0 {
panic!("Trees to index not found in database");
}

rows.iter().for_each(|row| {
let tree_address: Option<String> = row.try_get("", "address").unwrap();
match tree_address {
Some(s) => {
if let Ok(_) = Pubkey::from_str(s.as_str()) {
tree_addresses.push(s);
} else {
println!("Invalid tree address {:?}", s)
}
}
None => {}
}
});
}
Err(e) => {
panic!("Error fetching trees to index {:?}", e)
}
}

// let tree_addresses: Vec<String> = vec![
// // "GXTXbFwcbNdWbiCWzZc3J2XGofopnhN9T98jnG29D2Yw".to_string(),
// // "Aju7YfPdhjaqJbRdow48PqxcWutDDHWww6eoDC9PVY7m".to_string(),
// // "43XAHmPkq8Yth3swdqrh5aZvWrmuci5ZhPVLptreaUZ1".to_string(),
// // "EQQiiEceUo2uxHQgtRt8W92frLXwMUwdvt7P9Yo26cUM".to_string(),
// // "CkSa2n2eyJvsPLA7ufVos94NAUTYuVhaxrvH2GS69f9j".to_string()
// // "Dbx2uKULg44XeBR28tNWu2dU4bPpGfuYrd7RntgGXvuT".to_string(),
// // "CkSa2n2eyJvsPLA7ufVos94NAUTYuVhaxrvH2GS69f9j".to_string(),
// // "EBFsHQKYCn1obUr2FVNvGTkaUYf2p5jao2MVdbK5UNRH".to_string(),
// // "14b9wzhVSaiUHB4t8tDY9QYNsGStT8ycaoLkBHZLZwax".to_string(),
// // "6kAoPaZV4aB1rMPTPkbgycb9iNbHHibSzjhAvWEroMm".to_string(),
// // "FmUjM4YBLK93WSb7AnbuYZy1h2kCcjZM8kHsi9ZU93TP".to_string(),
// // "6JTnMcq9a6atrqmsz4rgTWp9EG5YPzxoobD7vg1csNt5".to_string(),
// // "HVGMVJ7DyfXLU2H5AJSHvX2HkFrRrHQAoXAHfYUmicYr".to_string(),
// // "D8yRakvsjWSR3ihANhwjP8RmNLg3A46EA1V1EbMLDT8B".to_string(),
// "B1eWW3tTBb5DHrwVrqJximAYLwucGzvjuJWxkFAe4v2X".to_string(),
// ];

println!("TREE ADDRESSES {:?}", tree_addresses);

let mut stream = select_all(
join_all(tree_addresses.iter().map(|address| {
Expand All @@ -81,9 +145,36 @@ async fn main() -> Result<()> {

task::spawn(handle_metadata_downloads(database_pool.clone()));

// join_all(tree_addresses.into_iter().map(backfill_tree)).await;

task::spawn(process_transactions_queue(database_pool.clone())).await?;
// join_all(tree_addresses.into_iter().map(|tr| {
// let db_connection_1 = SqlxPostgresConnector::from_sqlx_postgres_pool(database_pool.clone());
// let db_connection_2 = SqlxPostgresConnector::from_sqlx_postgres_pool(database_pool.clone());

// let backfill_future = backfill_tree(tr.clone(), db_connection_1);

// async move {
// let _ = backfill_future.await;

// let _ = db_connection_2
// .execute(Statement::from_sql_and_values(
// sea_orm::DatabaseBackend::Postgres,
// "UPDATE ld_merkle_trees SET genesis_backfill_completed=$1 WHERE address=$2;",
// vec![
// sea_orm::Value::from(true),
// sea_orm::Value::from(tr.as_str()),
// ],
// ))
// .await;
// }
// }))
// .await;

// tasks spawned to process transactions from queue. depending on your tree and queue sizes, adjust this
futures::future::join_all(
iter::repeat_with(|| process_transactions_queue(database_pool.clone()))
.take(15)
.collect::<Vec<_>>(),
)
.await;

Ok(())
}
Expand All @@ -92,8 +183,9 @@ async fn handle_stream(
mut stream: SelectAll<Pin<Box<dyn Stream<Item = Response<RpcLogsResponse>> + Send>>>,
) {
loop {
let logs = stream.next().await.unwrap();
process_logs(logs.value).await;
if let Some(logs) = stream.next().await {
process_logs(logs.value).await;
}
}
}

Expand All @@ -102,6 +194,6 @@ async fn handle_metadata_downloads(pool: PgPool) {
loop {
let _ = fetch_store_metadata(&connection).await;
println!("No metadata to update, sleeping for 5 secs");
sleep(Duration::from_secs(5))
sleep(Duration::from_secs(5)).await;
}
}
5 changes: 3 additions & 2 deletions src/processor/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ pub async fn process_logs(logs_response: RpcLogsResponse) {
let transaction_signature = logs_response.signature;

println!("websocket tx");
push_back(TransactionsQueue{
transaction_signature: transaction_signature.clone()
push_back(TransactionsQueue {
transaction_signature: transaction_signature.clone(),
tree_address: None,
});
}
42 changes: 36 additions & 6 deletions src/processor/queue_processor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use program_transformers::ProgramTransformer;
use sea_orm::{ConnectionTrait, SqlxPostgresConnector, Statement, Value};
use sqlx::PgPool;

use crate::{config::transaction_queue::pop_front, rpc::rpc::get_transaction_with_retries};
Expand All @@ -8,23 +9,52 @@ use futures::future::{ready, FutureExt};

pub async fn process_transactions_queue(database_pool: PgPool) {
let program_transformer = ProgramTransformer::new(
database_pool,
database_pool.clone(),
Box::new(|_info| ready(Ok(())).boxed()),
false,
);
loop {
let transaction_signature = pop_front();
let transaction = pop_front();
// println!("TX: {:?}", transaction.clone());
let transaction_signature: &String;

match transaction_signature {
match transaction {
Some(txs) => {
let transaction = get_transaction_with_retries(&txs.transaction_signature)
transaction_signature = &txs.transaction_signature;
let tree_address = txs.tree_address;
let transaction = get_transaction_with_retries(&transaction_signature)
.await
.unwrap();

process_transaction(&program_transformer, transaction).await;
if let Ok(_) = process_transaction(&program_transformer, transaction).await {
if let Some(tree_address_string) = tree_address {
let db_connection =
SqlxPostgresConnector::from_sqlx_postgres_pool(database_pool.clone());

let query: &str="UPDATE ld_merkle_trees SET last_processed_signature=$1 WHERE address=$2;";

let res = db_connection
.execute(Statement::from_sql_and_values(
sea_orm::DatabaseBackend::Postgres,
query,
vec![
Value::from(transaction_signature.as_str()),
Value::from(tree_address_string.as_str()),
],
))
.await;

match res {
Ok(_) => {}
Err(e) => {
println!("Failed to update `ld_merkle_trees` column `last_processed_signature` with error {:?}", e);
}
}
}
}
}
None => {
println!("No transactions in queue");
// println!("No transactions in queue");
}
}
}
Expand Down
Loading

0 comments on commit bfcd1a2

Please sign in to comment.