From b4a19ae1a39270c3fcfb13632294bce660b9dec9 Mon Sep 17 00:00:00 2001 From: Sergey Timoshin Date: Sun, 5 Jan 2025 13:44:35 +0000 Subject: [PATCH] forester batch ops refactored --- Cargo.lock | 3 + forester-utils/Cargo.toml | 2 +- forester-utils/src/forester_epoch.rs | 2 + forester-utils/src/indexer/mod.rs | 8 + forester-utils/src/instructions.rs | 446 +++++++++ forester-utils/src/lib.rs | 1 + forester/Cargo.toml | 2 + forester/package.json | 3 +- forester/src/batch_operations.rs | 319 +++++++ forester/src/batch_processor/address.rs | 54 ++ forester/src/batch_processor/common.rs | 91 ++ forester/src/batch_processor/error.rs | 55 ++ forester/src/batch_processor/mod.rs | 30 + forester/src/batch_processor/state.rs | 129 +++ forester/src/batched_address_ops.rs | 313 +++++++ forester/src/batched_ops.rs | 854 +++++++++--------- forester/src/epoch_manager.rs | 83 +- forester/src/forester_status.rs | 1 + forester/src/lib.rs | 5 +- forester/src/rollover/operations.rs | 62 ++ forester/src/tree_data_sync.rs | 26 +- forester/tests/batched_address_test.rs | 255 ++++++ ...ched_ops_test.rs => batched_state_test.rs} | 3 +- forester/tests/e2e_test.rs | 3 + .../src/create_address_test_program_sdk.rs | 12 +- .../utils/src/indexer/test_indexer.rs | 433 +++++---- prover/client/src/gnark/helpers.rs | 10 + 27 files changed, 2557 insertions(+), 648 deletions(-) create mode 100644 forester-utils/src/instructions.rs create mode 100644 forester/src/batch_operations.rs create mode 100644 forester/src/batch_processor/address.rs create mode 100644 forester/src/batch_processor/common.rs create mode 100644 forester/src/batch_processor/error.rs create mode 100644 forester/src/batch_processor/mod.rs create mode 100644 forester/src/batch_processor/state.rs create mode 100644 forester/src/batched_address_ops.rs create mode 100644 forester/tests/batched_address_test.rs rename forester/tests/{batched_ops_test.rs => batched_state_test.rs} (99%) diff --git a/Cargo.lock b/Cargo.lock index 725fe0228..0e682d48f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2003,6 +2003,7 @@ version = "1.1.0" dependencies = [ "account-compression", "anchor-lang", + "anyhow", "async-trait", "bincode", "borsh 0.10.3", @@ -2027,6 +2028,7 @@ dependencies = [ "light-test-utils", "light-utils 1.1.0", "light-verifier", + "log", "photon-api", "prometheus", "reqwest 0.11.27", @@ -2066,6 +2068,7 @@ dependencies = [ "light-registry", "light-system-program", "light-utils 1.1.0", + "light-verifier", "log", "num-bigint 0.4.6", "num-traits", diff --git a/forester-utils/Cargo.toml b/forester-utils/Cargo.toml index 4b50e00a5..9186a6c07 100644 --- a/forester-utils/Cargo.toml +++ b/forester-utils/Cargo.toml @@ -20,7 +20,7 @@ light-registry = { workspace = true } light-system-program = { workspace = true, features = ["cpi"] } light-utils = { workspace = true } light-batched-merkle-tree = { workspace = true } - +light-verifier = { workspace = true } photon-api = { workspace = true } light-client = { workspace = true } diff --git a/forester-utils/src/forester_epoch.rs b/forester-utils/src/forester_epoch.rs index 566f69226..81e48df3d 100644 --- a/forester-utils/src/forester_epoch.rs +++ b/forester-utils/src/forester_epoch.rs @@ -86,6 +86,7 @@ pub enum TreeType { Address, State, BatchedState, + BatchedAddress, } impl Display for TreeType { @@ -94,6 +95,7 @@ impl Display for TreeType { TreeType::Address => write!(f, "address"), TreeType::State => write!(f, "state"), TreeType::BatchedState => write!(f, "batched state"), + TreeType::BatchedAddress => write!(f, "batched address"), } } } diff --git a/forester-utils/src/indexer/mod.rs b/forester-utils/src/indexer/mod.rs index 91040afd7..854633fbf 100644 --- a/forester-utils/src/indexer/mod.rs +++ b/forester-utils/src/indexer/mod.rs @@ -258,6 +258,14 @@ pub trait Indexer: Sync + Send + Debug + 'static { ) { unimplemented!() } + + async fn finalize_batched_address_tree_update( + &mut self, + _rpc: &mut R, + _merkle_tree_pubkey: Pubkey, + ) { + unimplemented!() + } } #[derive(Debug, Clone)] diff --git a/forester-utils/src/instructions.rs b/forester-utils/src/instructions.rs new file mode 100644 index 000000000..3dd1692e5 --- /dev/null +++ b/forester-utils/src/instructions.rs @@ -0,0 +1,446 @@ +use log::error; +use reqwest::Client; +use solana_sdk::pubkey::Pubkey; +use thiserror::Error; +use light_batched_merkle_tree::constants::{DEFAULT_BATCH_ADDRESS_TREE_HEIGHT, DEFAULT_BATCH_STATE_TREE_HEIGHT}; +use light_batched_merkle_tree::merkle_tree::{AppendBatchProofInputsIx, BatchProofInputsIx, BatchedMerkleTreeAccount, InstructionDataBatchAppendInputs, InstructionDataBatchNullifyInputs}; +use light_batched_merkle_tree::queue::BatchedQueueAccount; +use light_client::rpc::RpcConnection; +use light_hasher::{Hasher, Poseidon}; +use light_prover_client::batch_address_append::get_batch_address_append_circuit_inputs; +use light_prover_client::batch_append_with_proofs::get_batch_append_with_proofs_inputs; +use light_prover_client::batch_update::get_batch_update_inputs; +use light_prover_client::gnark::batch_address_append_json_formatter::to_json; +use light_prover_client::gnark::batch_append_with_proofs_json_formatter::BatchAppendWithProofsInputsJson; +use light_prover_client::gnark::batch_update_json_formatter::update_inputs_string; +use light_prover_client::gnark::constants::{PROVE_PATH, SERVER_ADDRESS}; +use light_prover_client::gnark::proof_helpers::{compress_proof, deserialize_gnark_proof_json, proof_from_json_struct}; +use light_utils::bigint::bigint_to_be_bytes_array; +use light_verifier::CompressedProof; +use crate::indexer::Indexer; + +#[derive(Error, Debug)] +pub enum ForesterUtilsError { + #[error("parse error: {0:?}")] + ParseError(String), + #[error("prover error: {0:?}")] + ProverError(String), + #[error("rpc error: {0:?}")] + RpcError(String), + #[error("indexer error: {0:?}")] + IndexerError(String), +} + +pub async fn create_batch_update_address_tree_instruction_data>(rpc: &mut R, indexer: &mut I, merkle_tree_pubkey: Pubkey) -> Result<(InstructionDataBatchNullifyInputs, usize), ForesterUtilsError > { + let mut merkle_tree_account = rpc.get_account(merkle_tree_pubkey).await + .map_err(|e| { + error!( + "create_batch_update_address_tree_instruction_data: failed to get account data from rpc: {:?}", + e + ); + ForesterUtilsError::RpcError("Failed to get account data".into()) + })? + .unwrap(); + + let ( + old_root_index, + leaves_hashchain, + start_index, + current_root, + batch_size, + full_batch_index, + ) = { + let merkle_tree = BatchedMerkleTreeAccount::address_tree_from_bytes_mut( + merkle_tree_account.data.as_mut_slice(), + ) + .unwrap(); + + let old_root_index = merkle_tree.root_history.last_index(); + let full_batch_index = merkle_tree + .get_metadata() + .queue_metadata + .next_full_batch_index; + let batch = &merkle_tree.batches[full_batch_index as usize]; + let zkp_batch_index = batch.get_num_inserted_zkps(); + let leaves_hashchain = + merkle_tree.hashchain_store[full_batch_index as usize][zkp_batch_index as usize]; + let start_index = merkle_tree.get_metadata().next_index; + let current_root = *merkle_tree.root_history.last().unwrap(); + let batch_size = batch.zkp_batch_size as usize; + + ( + old_root_index, + leaves_hashchain, + start_index, + current_root, + batch_size, + full_batch_index, + ) + }; + + let batch_start_index = indexer + .get_address_merkle_trees() + .iter() + .find(|x| x.accounts.merkle_tree == merkle_tree_pubkey) + .unwrap() + .merkle_tree + .merkle_tree + .rightmost_index; + + let addresses = indexer + .get_queue_elements( + merkle_tree_pubkey.to_bytes(), + full_batch_index, + 0, + batch_size as u64, + ) + .await + .map_err(|e| { + error!( + "create_batch_update_address_tree_instruction_data: failed to get queue elements from indexer: {:?}", + e + ); + ForesterUtilsError::IndexerError("Failed to get queue elements".into()) + })?; + + let batch_size = addresses.len(); + + // Get proof info after addresses are retrieved + let non_inclusion_proofs = indexer + .get_multiple_new_address_proofs_full( + merkle_tree_pubkey.to_bytes(), + addresses.clone(), + ) + .await + .map_err(|e| { + error!( + "create_batch_update_address_tree_instruction_data: failed to get get_multiple_new_address_proofs_full from indexer: {:?}", + e + ); + ForesterUtilsError::IndexerError("Failed to get get_multiple_new_address_proofs_full".into()) + })?; + + let mut low_element_values = Vec::new(); + let mut low_element_indices = Vec::new(); + let mut low_element_next_indices = Vec::new(); + let mut low_element_next_values = Vec::new(); + let mut low_element_proofs: Vec> = Vec::new(); + + for non_inclusion_proof in &non_inclusion_proofs { + low_element_values.push(non_inclusion_proof.low_address_value); + low_element_indices.push(non_inclusion_proof.low_address_index as usize); + low_element_next_indices.push(non_inclusion_proof.low_address_next_index as usize); + low_element_next_values.push(non_inclusion_proof.low_address_next_value); + low_element_proofs.push(non_inclusion_proof.low_address_proof.to_vec()); + } + + let subtrees = indexer + .get_subtrees(merkle_tree_pubkey.to_bytes()) + .await + .map_err(|e| { + error!( + "create_batch_update_address_tree_instruction_data: failed to get subtrees from indexer: {:?}", + e + ); + ForesterUtilsError::IndexerError("Failed to get subtrees".into()) + })? + .try_into() + .unwrap(); + + let inputs = get_batch_address_append_circuit_inputs::< + { DEFAULT_BATCH_ADDRESS_TREE_HEIGHT as usize }, + >( + start_index as usize, + current_root, + low_element_values, + low_element_next_values, + low_element_indices, + low_element_next_indices, + low_element_proofs, + addresses, + subtrees, + leaves_hashchain, + batch_start_index, + batch_size, + ) + .map_err(|e| { + error!( + "create_batch_update_address_tree_instruction_data: failed to get circuit inputs: {:?}", + e + ); + ForesterUtilsError::ProverError("Failed to get circuit inputs".into()) + })?; + + let client = Client::new(); + let circuit_inputs_new_root = bigint_to_be_bytes_array::<32>(&inputs.new_root).unwrap(); + let inputs = to_json(&inputs); + + let response_result = client + .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) + .header("Content-Type", "text/plain; charset=utf-8") + .body(inputs) + .send() + .await + .expect("Failed to execute request."); + + if response_result.status().is_success() { + let body = response_result.text().await.unwrap(); + let proof_json = deserialize_gnark_proof_json(&body).unwrap(); + let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); + let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); + let instruction_data = InstructionDataBatchNullifyInputs { + public_inputs: BatchProofInputsIx { + new_root: circuit_inputs_new_root, + old_root_index: old_root_index as u16, + }, + compressed_proof: CompressedProof { + a: proof_a, + b: proof_b, + c: proof_c, + }, + }; + Ok((instruction_data, batch_size)) + } else { + Err(ForesterUtilsError::ProverError( + "Prover failed to generate proof".to_string(), + )) + } +} + +pub async fn create_append_batch_ix_data>(rpc: &mut R, indexer: &mut I, merkle_tree_pubkey: Pubkey, output_queue_pubkey: Pubkey) -> Result { + let (merkle_tree_next_index, current_root) = { + let mut merkle_tree_account = rpc + .get_account(merkle_tree_pubkey) + .await + .unwrap() + .unwrap(); + let merkle_tree = BatchedMerkleTreeAccount::state_tree_from_bytes_mut( + merkle_tree_account.data.as_mut_slice(), + ) + .unwrap(); + ( + merkle_tree.get_metadata().next_index, + *merkle_tree.root_history.last().unwrap(), + ) + }; + + let (zkp_batch_size, full_batch_index, num_inserted_zkps, leaves_hashchain) = { + let mut output_queue_account = rpc + .get_account(output_queue_pubkey) + .await + .unwrap() + .unwrap(); + let output_queue = BatchedQueueAccount::output_queue_from_bytes_mut( + output_queue_account.data.as_mut_slice(), + ) + .unwrap(); + + let queue_metadata = output_queue.get_metadata(); + let full_batch_index = queue_metadata.batch_metadata.next_full_batch_index; + let zkp_batch_size = queue_metadata.batch_metadata.zkp_batch_size; + + let num_inserted_zkps = + output_queue.batches[full_batch_index as usize].get_num_inserted_zkps(); + + let leaves_hashchain = + output_queue.hashchain_store[full_batch_index as usize][num_inserted_zkps as usize]; + + ( + zkp_batch_size, + full_batch_index, + num_inserted_zkps, + leaves_hashchain, + ) + }; + let start = num_inserted_zkps as usize * zkp_batch_size as usize; + let end = start + zkp_batch_size as usize; + + let leaves = indexer + .get_queue_elements( + merkle_tree_pubkey.to_bytes(), + full_batch_index, + start as u64, + end as u64, + ) + .await + .unwrap(); + + let (old_leaves, merkle_proofs) = { + let mut old_leaves = vec![]; + let mut merkle_proofs = vec![]; + let indices = (merkle_tree_next_index..merkle_tree_next_index + zkp_batch_size) + .collect::>(); + let proofs = indexer + .get_proofs_by_indices(merkle_tree_pubkey, &indices); + proofs.iter().for_each(|proof| { + old_leaves.push(proof.leaf); + merkle_proofs.push(proof.proof.clone()); + }); + + (old_leaves, merkle_proofs) + }; + + let (proof, new_root) = { + let circuit_inputs = get_batch_append_with_proofs_inputs::< + { DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }, + >( + current_root, + merkle_tree_next_index as u32, + leaves, + leaves_hashchain, + old_leaves, + merkle_proofs, + zkp_batch_size as u32, + ) + .unwrap(); + + let client = Client::new(); + let inputs_json = + BatchAppendWithProofsInputsJson::from_inputs(&circuit_inputs).to_string(); + + let response = client + .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) + .header("Content-Type", "text/plain; charset=utf-8") + .body(inputs_json) + .send() + .await + .expect("Failed to execute request."); + + if response.status().is_success() { + let body = response.text().await.unwrap(); + let proof_json = deserialize_gnark_proof_json(&body).unwrap(); + let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); + let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); + ( + CompressedProof { + a: proof_a, + b: proof_b, + c: proof_c, + }, + bigint_to_be_bytes_array::<32>(&circuit_inputs.new_root.to_biguint().unwrap()) + .unwrap(), + ) + } else { + error!( + "create_append_batch_ix_data: failed to get proof from server: {:?}", + response.text().await + ); + return Err(ForesterUtilsError::ProverError( + "Failed to get proof from server".into(), + )); + } + }; + + Ok(InstructionDataBatchAppendInputs { + public_inputs: AppendBatchProofInputsIx { new_root }, + compressed_proof: proof, + }) +} + + +pub async fn create_nullify_batch_ix_data>(rpc: &mut R, indexer: &mut I, merkle_tree_pubkey: Pubkey) -> Result { + let (zkp_batch_size, old_root, old_root_index, leaves_hashchain) = { + let mut account = rpc + .get_account(merkle_tree_pubkey) + .await + .unwrap() + .unwrap(); + let merkle_tree = + BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) + .unwrap(); + let metadata = merkle_tree.get_metadata(); + let batch_idx = metadata.queue_metadata.next_full_batch_index as usize; + let zkp_size = metadata.queue_metadata.zkp_batch_size; + let batch = &merkle_tree.batches[batch_idx]; + let zkp_idx = batch.get_num_inserted_zkps(); + let hashchain = merkle_tree.hashchain_store[batch_idx][zkp_idx as usize]; + let root_idx = merkle_tree.root_history.last_index(); + let root = *merkle_tree.root_history.last().unwrap(); + (zkp_size, root, root_idx, hashchain) + }; + + let leaf_indices_tx_hashes = indexer + .get_leaf_indices_tx_hashes(merkle_tree_pubkey, zkp_batch_size as usize); + + let mut leaves = Vec::new(); + let mut tx_hashes = Vec::new(); + let mut old_leaves = Vec::new(); + let mut path_indices = Vec::new(); + let mut merkle_proofs = Vec::new(); + let mut nullifiers = Vec::new(); + + let proofs = indexer.get_proofs_by_indices( + merkle_tree_pubkey, + &leaf_indices_tx_hashes + .iter() + .map(|(index, _, _)| *index as u64) + .collect::>(), + ); + + for ((index, leaf, tx_hash), proof) in leaf_indices_tx_hashes.iter().zip(proofs.iter()) { + path_indices.push(*index); + leaves.push(*leaf); + old_leaves.push(proof.leaf); + merkle_proofs.push(proof.proof.clone()); + tx_hashes.push(*tx_hash); + let index_bytes = index.to_be_bytes(); + let nullifier = Poseidon::hashv(&[leaf, &index_bytes, tx_hash]).unwrap(); + nullifiers.push(nullifier); + } + + let inputs = get_batch_update_inputs::<{ DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }>( + old_root, + tx_hashes, + leaves.to_vec(), + leaves_hashchain, + old_leaves, + merkle_proofs, + path_indices, + zkp_batch_size as u32, + ) + .unwrap(); + + let new_root = + bigint_to_be_bytes_array::<32>(&inputs.new_root.to_biguint().unwrap()).unwrap(); + + let client = Client::new(); + let response = client + .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) + .header("Content-Type", "text/plain; charset=utf-8") + .body(update_inputs_string(&inputs)) + .send() + .await + .map_err(|e| { + error!( + "get_batched_nullify_ix_data: failed to send proof to server: {:?}", + e + ); + ForesterUtilsError::ProverError("Failed to send proof to server".into()) + })?; + + let proof = if response.status().is_success() { + let body = response.text().await.unwrap(); + let proof_json = deserialize_gnark_proof_json(&body).unwrap(); + let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); + let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); + CompressedProof { + a: proof_a, + b: proof_b, + c: proof_c, + } + } else { + error!( + "get_batched_nullify_ix_data: failed to get proof from server: {:?}", + response.text().await + ); + return Err(ForesterUtilsError::ProverError("Failed to get proof from server".into())); + }; + + Ok(InstructionDataBatchNullifyInputs { + public_inputs: BatchProofInputsIx { + new_root, + old_root_index: old_root_index as u16, + }, + compressed_proof: proof, + }) +} \ No newline at end of file diff --git a/forester-utils/src/lib.rs b/forester-utils/src/lib.rs index fd66d2aff..8c7441508 100644 --- a/forester-utils/src/lib.rs +++ b/forester-utils/src/lib.rs @@ -18,6 +18,7 @@ pub mod address_merkle_tree_config; pub mod forester_epoch; pub mod indexer; pub mod registry; +pub mod instructions; pub fn create_account_instruction( payer: &Pubkey, diff --git a/forester/Cargo.toml b/forester/Cargo.toml index dc7ce01d8..e40b140b4 100644 --- a/forester/Cargo.toml +++ b/forester/Cargo.toml @@ -44,6 +44,8 @@ dashmap = "6.1.0" scopeguard = "1.2.0" light-client = { workspace = true } light-merkle-tree-metadata = { workspace = true } +log = "0.4.22" +anyhow = "1.0.94" [dev-dependencies] serial_test = "3.2.0" diff --git a/forester/package.json b/forester/package.json index e3528ac31..da8dfce32 100644 --- a/forester/package.json +++ b/forester/package.json @@ -5,7 +5,8 @@ "scripts": { "build": "cargo build", "test": "RUSTFLAGS=\"--cfg tokio_unstable -D warnings\" cargo test --package forester -- --test-threads=1 --nocapture", - "test-state-batch": "cargo test --package forester test_batched -- --test-threads=1 --nocapture", + "test-state-batched": "cargo test --package forester test_state_batched -- --test-threads=1 --nocapture", + "test-address-batched": "cargo test --package forester test_address_batched -- --test-threads=1 --nocapture", "docker:build": "docker build --tag forester -f Dockerfile .." }, "devDependencies": { diff --git a/forester/src/batch_operations.rs b/forester/src/batch_operations.rs new file mode 100644 index 000000000..3536574dc --- /dev/null +++ b/forester/src/batch_operations.rs @@ -0,0 +1,319 @@ +// use crate::{errors::ForesterError, Result}; +// use borsh::BorshSerialize; +// use forester_utils::forester_epoch::TreeType; +// use forester_utils::indexer::Indexer; +// use forester_utils::instructions::{ +// create_append_batch_ix_data, create_batch_update_address_tree_instruction_data, +// create_nullify_batch_ix_data, +// }; +// use light_batched_merkle_tree::batch::BatchState; +// use light_batched_merkle_tree::event::{BatchAppendEvent, BatchNullifyEvent}; +// use light_batched_merkle_tree::merkle_tree::{ +// BatchedMerkleTreeAccount, InstructionDataBatchAppendInputs, InstructionDataBatchNullifyInputs, +// }; +// use light_batched_merkle_tree::queue::BatchedQueueAccount; +// use light_client::rpc::RpcConnection; +// use light_client::rpc_pool::SolanaRpcPool; +// use light_registry::account_compression_cpi::sdk::{ +// create_batch_append_instruction, create_batch_nullify_instruction, +// create_batch_update_address_tree_instruction, +// }; +// use solana_program::pubkey::Pubkey; +// use solana_sdk::signature::Keypair; +// use solana_sdk::signer::Signer; +// use std::sync::Arc; +// use tokio::sync::Mutex; +// use tracing::{debug, info, instrument}; +// +// #[derive(Debug)] +// pub struct BatchContext> { +// pub rpc_pool: Arc>, +// pub indexer: Arc>, +// pub authority: Keypair, +// pub derivation: Pubkey, +// pub epoch: u64, +// pub merkle_tree: Pubkey, +// pub output_queue: Pubkey, +// } +// +// #[derive(Debug)] +// pub struct BatchProcessor> { +// context: BatchContext, +// tree_type: TreeType, +// } +// +// impl> BatchProcessor { +// pub fn new(context: BatchContext, tree_type: TreeType) -> Self { +// Self { context, tree_type } +// } +// +// #[instrument(level = "debug", skip(self))] +// pub async fn process(&self) -> Result { +// if !self.verify_batch_ready().await { +// debug!("Batch is not ready for processing"); +// return Ok(0); +// } +// +// match self.tree_type { +// TreeType::BatchedAddress => { +// info!("Processing address batch"); +// self.process_address_batch().await +// } +// TreeType::BatchedState => { +// info!("Processing state batch"); +// self.process_state_batch().await +// } +// _ => Err(ForesterError::Custom(format!( +// "Unsupported tree type: {:?}", +// self.tree_type +// ))), +// } +// } +// +// #[instrument( +// level = "debug", +// skip(self), +// fields( +// epoch = self.context.epoch, +// tree = %self.context.merkle_tree +// ) +// )] +// async fn verify_batch_ready(&self) -> bool { +// let mut rpc = match self.context.rpc_pool.get_connection().await { +// Ok(rpc) => rpc, +// Err(_) => return false, +// }; +// +// let mut account = match rpc.get_account(self.context.merkle_tree).await { +// Ok(Some(account)) => account, +// _ => return false, +// }; +// +// let is_ready = { +// let merkle_tree = match self.tree_type { +// TreeType::BatchedAddress => BatchedMerkleTreeAccount::address_tree_from_bytes_mut( +// account.data.as_mut_slice(), +// ), +// TreeType::BatchedState => { +// BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) +// } +// _ => return false, +// }; +// +// if let Ok(tree) = merkle_tree { +// let batch_index = tree.get_metadata().queue_metadata.next_full_batch_index; +// let full_batch = tree.batches.get(batch_index as usize).unwrap(); +// +// full_batch.get_state() != BatchState::Inserted +// && full_batch.get_current_zkp_batch_index() > full_batch.get_num_inserted_zkps() +// } else { +// false +// } +// }; +// is_ready +// } +// +// async fn process_address_batch(&self) -> Result { +// info!("process_address_batch"); +// let (instruction_data, batch_size) = self +// .create_batch_update_address_tree_instruction_data() +// .await?; +// +// let instruction = create_batch_update_address_tree_instruction( +// self.context.authority.pubkey(), +// self.context.derivation, +// self.context.merkle_tree, +// self.context.epoch, +// instruction_data.try_to_vec()?, +// ); +// +// let mut rpc = self.context.rpc_pool.get_connection().await?; +// +// // TODO: should we do instead rpc.create_and_send_transaction_with_event::( +// rpc.create_and_send_transaction( +// &[instruction], +// &self.context.authority.pubkey(), +// &[&self.context.authority], +// ) +// .await?; +// +// let mut indexer = self.context.indexer.lock().await; +// indexer +// .finalize_batched_address_tree_update(&mut *rpc, self.context.merkle_tree) +// .await; +// +// Ok(batch_size) +// } +// +// async fn process_state_batch(&self) -> Result { +// info!("Performing state batch append operation"); +// let mut rpc = self.context.rpc_pool.get_connection().await?; +// +// let (num_inserted_zkps, zkp_batch_size) = { +// let mut output_queue_account = +// rpc.get_account(self.context.output_queue).await?.unwrap(); +// let output_queue = BatchedQueueAccount::output_queue_from_bytes_mut( +// output_queue_account.data.as_mut_slice(), +// ) +// .map_err(|e| ForesterError::Custom(e.to_string()))?; +// +// let batch_index = output_queue +// .get_metadata() +// .batch_metadata +// .next_full_batch_index; +// let zkp_batch_size = output_queue.get_metadata().batch_metadata.zkp_batch_size; +// +// ( +// output_queue.batches[batch_index as usize].get_num_inserted_zkps(), +// zkp_batch_size as usize, +// ) +// }; +// +// let instruction_data = self.create_append_batch_ix_data().await?; +// let instruction = create_batch_append_instruction( +// self.context.authority.pubkey(), +// self.context.derivation, +// self.context.merkle_tree, +// self.context.output_queue, +// self.context.epoch, +// instruction_data.try_to_vec()?, +// ); +// +// rpc.create_and_send_transaction_with_event::( +// &[instruction], +// &self.context.authority.pubkey(), +// &[&self.context.authority], +// None, +// ) +// .await?; +// +// info!("Updating indexer after append"); +// self.update_indexer_after_append(num_inserted_zkps).await?; +// info!("Indexer updated after append"); +// +// info!("Performing state batch nullify operation"); +// let instruction_data = self.create_nullify_batch_ix_data().await?; +// let instruction = create_batch_nullify_instruction( +// self.context.authority.pubkey(), +// self.context.derivation, +// self.context.merkle_tree, +// self.context.epoch, +// instruction_data.try_to_vec()?, +// ); +// rpc.create_and_send_transaction_with_event::( +// &[instruction], +// &self.context.authority.pubkey(), +// &[&self.context.authority], +// None, +// ) +// .await?; +// +// info!("Updating indexer after nullify"); +// self.update_indexer_after_nullify().await?; +// info!("Indexer updated after nullify"); +// +// Ok(zkp_batch_size * 2) +// } +// +// async fn update_indexer_after_append(&self, num_inserted_zkps: u64) -> Result<()> { +// let mut rpc = self.context.rpc_pool.get_connection().await?; +// let mut indexer = self.context.indexer.lock().await; +// +// indexer +// .update_test_indexer_after_append( +// &mut *rpc, +// self.context.merkle_tree, +// self.context.output_queue, +// num_inserted_zkps, +// ) +// .await; +// +// Ok(()) +// } +// +// async fn update_indexer_after_nullify(&self) -> Result<()> { +// let mut rpc = self.context.rpc_pool.get_connection().await?; +// let mut indexer = self.context.indexer.lock().await; +// +// let batch_index = { +// let mut account = rpc.get_account(self.context.merkle_tree).await?.unwrap(); +// let merkle_tree = +// BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) +// .map_err(|e| { +// ForesterError::Custom(format!("Failed to parse merkle tree account: {}", e)) +// })?; +// merkle_tree +// .get_metadata() +// .queue_metadata +// .next_full_batch_index +// }; +// +// indexer +// .update_test_indexer_after_nullification( +// &mut *rpc, +// self.context.merkle_tree, +// batch_index as usize, +// ) +// .await; +// +// Ok(()) +// } +// +// async fn create_batch_update_address_tree_instruction_data( +// &self, +// ) -> Result<(InstructionDataBatchNullifyInputs, usize)> { +// let mut rpc = self.context.rpc_pool.get_connection().await?; +// +// let mut indexer = self.context.indexer.lock().await; +// +// create_batch_update_address_tree_instruction_data( +// &mut *rpc, +// &mut *indexer, +// self.context.merkle_tree, +// ) +// .await +// .map_err(|e| ForesterError::Custom(e.to_string())) +// } +// +// async fn create_append_batch_ix_data(&self) -> Result { +// let mut rpc = self.context.rpc_pool.get_connection().await?; +// +// let mut indexer = self.context.indexer.lock().await; +// +// create_append_batch_ix_data( +// &mut *rpc, +// &mut *indexer, +// self.context.merkle_tree, +// self.context.output_queue, +// ) +// .await +// .map_err(|e| ForesterError::Custom(e.to_string())) +// } +// +// async fn create_nullify_batch_ix_data(&self) -> Result { +// let mut rpc = self.context.rpc_pool.get_connection().await?; +// +// let mut indexer = self.context.indexer.lock().await; +// +// create_nullify_batch_ix_data(&mut *rpc, &mut *indexer, self.context.merkle_tree) +// .await +// .map_err(|e| ForesterError::Custom(e.to_string())) +// } +// } +// +// #[instrument( +// level = "debug", +// fields( +// epoch = context.epoch, +// tree = %context.merkle_tree, +// tree_type = ?tree_type +// ) +// )] +// pub async fn process_batched_operations>( +// context: BatchContext, +// tree_type: TreeType, +// ) -> Result { +// info!("process_batched_operations"); +// let processor = BatchProcessor::new(context, tree_type); +// processor.process().await +// } diff --git a/forester/src/batch_processor/address.rs b/forester/src/batch_processor/address.rs new file mode 100644 index 000000000..f8b1243d2 --- /dev/null +++ b/forester/src/batch_processor/address.rs @@ -0,0 +1,54 @@ +use crate::batch_processor::error::{BatchProcessError, Result}; +use super::common::BatchContext; +use borsh::BorshSerialize; +use solana_sdk::signer::Signer; +use forester_utils::{indexer::Indexer, instructions::create_batch_update_address_tree_instruction_data}; +use light_registry::account_compression_cpi::sdk::create_batch_update_address_tree_instruction; +use light_client::rpc::RpcConnection; +use tracing::{info, instrument}; + +#[instrument(level = "debug", skip(context), fields(tree = %context.merkle_tree))] +pub(crate) async fn process_batch>( + context: &BatchContext, +) -> Result { + info!("Processing address batch operation"); + let mut rpc = context.rpc_pool.get_connection().await?; + + // Create instruction data and get batch size + let (instruction_data, batch_size) = create_batch_update_address_tree_instruction_data( + &mut *rpc, + &mut *context.indexer.lock().await, + context.merkle_tree, + ) + .await + .map_err(|e| BatchProcessError::InstructionData(e.to_string()))?; + + // Create the instruction + let instruction = create_batch_update_address_tree_instruction( + context.authority.pubkey(), + context.derivation, + context.merkle_tree, + context.epoch, + instruction_data.try_to_vec() + .map_err(|e| BatchProcessError::InstructionData(format!("Failed to serialize instruction data: {}", e)))?, + ); + + // TODO: send transaction with event? + // rpc.create_and_send_transaction_with_event::( + rpc.create_and_send_transaction( + &[instruction], + &context.authority.pubkey(), + &[&context.authority], + ) + .await + .map_err(|e| BatchProcessError::Transaction(format!("Failed to send address update transaction: {}", e)))?; + + // Update indexer state after successful transaction + let mut indexer = context.indexer.lock().await; + indexer + .finalize_batched_address_tree_update(&mut *rpc, context.merkle_tree) + .await; + + info!("Address batch processing completed successfully. Batch size: {}", batch_size); + Ok(batch_size) +} \ No newline at end of file diff --git a/forester/src/batch_processor/common.rs b/forester/src/batch_processor/common.rs new file mode 100644 index 000000000..f1de39d56 --- /dev/null +++ b/forester/src/batch_processor/common.rs @@ -0,0 +1,91 @@ +use std::sync::Arc; +use forester_utils::{forester_epoch::TreeType, indexer::Indexer}; +use light_client::{rpc::RpcConnection, rpc_pool::SolanaRpcPool}; +use solana_program::pubkey::Pubkey; +use solana_sdk::signature::Keypair; +use tokio::sync::Mutex; +use light_batched_merkle_tree::{ + batch::BatchState, + merkle_tree::BatchedMerkleTreeAccount, +}; +use tracing::{debug, info, instrument}; + +use super::{address, state, error::Result, error}; + +#[derive(Debug)] +pub struct BatchContext> { + pub rpc_pool: Arc>, + pub indexer: Arc>, + pub authority: Keypair, + pub derivation: Pubkey, + pub epoch: u64, + pub merkle_tree: Pubkey, + pub output_queue: Pubkey, +} + +#[derive(Debug)] +pub struct BatchProcessor> { + context: BatchContext, + tree_type: TreeType, +} + +impl> BatchProcessor { + pub fn new(context: BatchContext, tree_type: TreeType) -> Self { + Self { context, tree_type } + } + + #[instrument(level = "debug", skip(self))] + pub async fn process(&self) -> Result { + if !self.verify_batch_ready().await { + debug!("Batch is not ready for processing"); + return Ok(0); + } + + match self.tree_type { + TreeType::BatchedAddress => { + info!("Processing address batch"); + address::process_batch(&self.context).await + } + TreeType::BatchedState => { + info!("Processing state batch"); + state::process_batch(&self.context).await + } + _ => Err(error::BatchProcessError::UnsupportedTreeType(self.tree_type)), + } + } + + async fn verify_batch_ready(&self) -> bool { + let mut rpc = match self.context.rpc_pool.get_connection().await { + Ok(rpc) => rpc, + Err(_) => return false, + }; + + let mut account = match rpc.get_account(self.context.merkle_tree).await { + Ok(Some(account)) => account, + _ => return false, + }; + + let is_ready = { + let merkle_tree = match self.tree_type { + TreeType::BatchedAddress => { + BatchedMerkleTreeAccount::address_tree_from_bytes_mut(account.data.as_mut_slice()) + } + TreeType::BatchedState => { + BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) + } + _ => return false, + }; + + if let Ok(tree) = merkle_tree { + let batch_index = tree.get_metadata().queue_metadata.next_full_batch_index; + let full_batch = tree.batches.get(batch_index as usize).unwrap(); + + full_batch.get_state() != BatchState::Inserted + && full_batch.get_current_zkp_batch_index() > full_batch.get_num_inserted_zkps() + } else { + false + } + }; + is_ready + } +} \ No newline at end of file diff --git a/forester/src/batch_processor/error.rs b/forester/src/batch_processor/error.rs new file mode 100644 index 000000000..d139da4b1 --- /dev/null +++ b/forester/src/batch_processor/error.rs @@ -0,0 +1,55 @@ +use solana_client::rpc_request::RpcError; +use forester_utils::forester_epoch::TreeType; +use thiserror::Error; +use light_client::rpc_pool::PoolError; + +pub type Result = std::result::Result; + +#[derive(Debug, Error)] +pub enum BatchProcessError { + #[error("Failed to parse queue account: {0}")] + QueueParsing(String), + + #[error("Failed to parse merkle tree account: {0}")] + MerkleTreeParsing(String), + + #[error("Failed to create instruction data: {0}")] + InstructionData(String), + + #[error("Transaction failed: {0}")] + Transaction(String), + + #[error("RPC error: {0}")] + Rpc(String), + + #[error("Pool error: {0}")] + Pool(String), + + #[error("Indexer error: {0}")] + Indexer(String), + + #[error("Unsupported tree type: {0:?}")] + UnsupportedTreeType(TreeType), + + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +impl From for BatchProcessError { + fn from(e: light_client::rpc::RpcError) -> Self { + Self::Rpc(e.to_string()) + } +} + + +impl From for BatchProcessError { + fn from(e: RpcError) -> Self { + Self::Rpc(e.to_string()) + } +} + +impl From for BatchProcessError { + fn from(e: PoolError) -> Self { + Self::Pool(e.to_string()) + } +} \ No newline at end of file diff --git a/forester/src/batch_processor/mod.rs b/forester/src/batch_processor/mod.rs new file mode 100644 index 000000000..a71205d27 --- /dev/null +++ b/forester/src/batch_processor/mod.rs @@ -0,0 +1,30 @@ +mod address; +mod state; +mod common; +mod error; + +use common::BatchProcessor; +use error::Result; +use tracing::{info, instrument}; +use forester_utils::{forester_epoch::TreeType, indexer::Indexer}; +use light_client::rpc::RpcConnection; + +#[instrument( + level = "debug", + fields( + epoch = context.epoch, + tree = %context.merkle_tree, + tree_type = ?tree_type + ) +)] +pub async fn process_batched_operations>( + context: BatchContext, + tree_type: TreeType, +) -> Result { + info!("process_batched_operations"); + let processor = BatchProcessor::new(context, tree_type); + processor.process().await +} + +pub use common::BatchContext; +pub use error::BatchProcessError; \ No newline at end of file diff --git a/forester/src/batch_processor/state.rs b/forester/src/batch_processor/state.rs new file mode 100644 index 000000000..3870cfc5c --- /dev/null +++ b/forester/src/batch_processor/state.rs @@ -0,0 +1,129 @@ +use crate::batch_processor::error::{BatchProcessError, Result}; +use super::common::BatchContext; +use borsh::BorshSerialize; +use solana_sdk::signer::Signer; +use forester_utils::{indexer::Indexer, instructions::{create_append_batch_ix_data, create_nullify_batch_ix_data}}; +use light_batched_merkle_tree::{ + event::{BatchAppendEvent, BatchNullifyEvent}, + queue::BatchedQueueAccount, +}; +use light_registry::account_compression_cpi::sdk::{create_batch_append_instruction, create_batch_nullify_instruction}; +use light_client::rpc::RpcConnection; +use tracing::info; + +pub(crate) async fn process_batch>( + context: &BatchContext, +) -> Result { + info!("Processing state batch append operation"); + let mut rpc = context.rpc_pool.get_connection().await?; + + let (num_inserted_zkps, zkp_batch_size) = { + let mut output_queue_account = rpc.get_account(context.output_queue).await?.unwrap(); + let output_queue = BatchedQueueAccount::output_queue_from_bytes_mut( + output_queue_account.data.as_mut_slice(), + ).map_err(|e| BatchProcessError::QueueParsing(e.to_string()))?; + + let batch_index = output_queue.get_metadata().batch_metadata.next_full_batch_index; + let zkp_batch_size = output_queue.get_metadata().batch_metadata.zkp_batch_size; + + ( + output_queue.batches[batch_index as usize].get_num_inserted_zkps(), + zkp_batch_size as usize, + ) + }; + + perform_append(context, &mut rpc, num_inserted_zkps).await?; + info!("Append operation completed"); + + perform_nullify(context, &mut rpc).await?; + info!("Nullify operation completed"); + + Ok(zkp_batch_size * 2) +} + +async fn perform_append>( + context: &BatchContext, + rpc: &mut R, + num_inserted_zkps: u64, +) -> Result<()> { + let instruction_data = create_append_batch_ix_data( + rpc, + &mut *context.indexer.lock().await, + context.merkle_tree, + context.output_queue, + ).await.map_err(|e| BatchProcessError::InstructionData(e.to_string()))?; + + let instruction = create_batch_append_instruction( + context.authority.pubkey(), + context.derivation, + context.merkle_tree, + context.output_queue, + context.epoch, + instruction_data.try_to_vec().map_err(|e| BatchProcessError::InstructionData(e.to_string()))?, + ); + + rpc.create_and_send_transaction_with_event::( + &[instruction], + &context.authority.pubkey(), + &[&context.authority], + None, + ).await?; + + let mut indexer = context.indexer.lock().await; + indexer.update_test_indexer_after_append( + rpc, + context.merkle_tree, + context.output_queue, + num_inserted_zkps, + ).await; + + Ok(()) +} + +async fn perform_nullify>( + context: &BatchContext, + rpc: &mut R, +) -> Result<()> { + let batch_index = get_batch_index(context, rpc).await?; + + let instruction_data = create_nullify_batch_ix_data( + rpc, + &mut *context.indexer.lock().await, + context.merkle_tree, + ).await.map_err(|e| BatchProcessError::InstructionData(e.to_string()))?; + + let instruction = create_batch_nullify_instruction( + context.authority.pubkey(), + context.derivation, + context.merkle_tree, + context.epoch, + instruction_data.try_to_vec().map_err(|e| BatchProcessError::InstructionData(e.to_string()))?, + ); + + rpc.create_and_send_transaction_with_event::( + &[instruction], + &context.authority.pubkey(), + &[&context.authority], + None, + ).await?; + + context.indexer.lock().await.update_test_indexer_after_nullification( + rpc, + context.merkle_tree, + batch_index, + ).await; + + Ok(()) +} + +async fn get_batch_index>( + context: &BatchContext, + rpc: &mut R, +) -> Result { + let mut account = rpc.get_account(context.merkle_tree).await?.unwrap(); + let merkle_tree = light_batched_merkle_tree::merkle_tree::BatchedMerkleTreeAccount::state_tree_from_bytes_mut( + account.data.as_mut_slice(), + ).map_err(|e| BatchProcessError::MerkleTreeParsing(e.to_string()))?; + + Ok(merkle_tree.get_metadata().queue_metadata.next_full_batch_index as usize) +} \ No newline at end of file diff --git a/forester/src/batched_address_ops.rs b/forester/src/batched_address_ops.rs new file mode 100644 index 000000000..d2bf3e55c --- /dev/null +++ b/forester/src/batched_address_ops.rs @@ -0,0 +1,313 @@ +// use std::sync::Arc; +// +// use borsh::BorshSerialize; +// use forester_utils::indexer::Indexer; +// use light_batched_merkle_tree::{ +// batch::BatchState, +// constants::DEFAULT_BATCH_ADDRESS_TREE_HEIGHT, +// merkle_tree::{ +// BatchProofInputsIx, BatchedMerkleTreeAccount, InstructionDataBatchNullifyInputs, +// }, +// }; +// use light_client::{ +// rpc::{RpcConnection, RpcError}, +// rpc_pool::SolanaRpcPool, +// }; +// use light_prover_client::{ +// batch_address_append::get_batch_address_append_circuit_inputs, +// gnark::{ +// batch_address_append_json_formatter::to_json, +// constants::{PROVE_PATH, SERVER_ADDRESS}, +// proof_helpers::{compress_proof, deserialize_gnark_proof_json, proof_from_json_struct}, +// }, +// }; +// use light_registry::account_compression_cpi::sdk::create_batch_update_address_tree_instruction; +// use light_utils::bigint::bigint_to_be_bytes_array; +// use light_verifier::CompressedProof; +// use reqwest::Client; +// use solana_program::pubkey::Pubkey; +// use solana_sdk::{signature::Keypair, signer::Signer}; +// use tokio::sync::Mutex; +// use tracing::info; +// +// use crate::{errors::ForesterError, Result}; +// +// pub struct BatchedAddressOperations> { +// pub rpc_pool: Arc>, +// pub indexer: Arc>, +// pub authority: Keypair, +// pub derivation: Pubkey, +// pub epoch: u64, +// pub merkle_tree: Pubkey, +// pub output_queue: Pubkey, +// } +// impl> BatchedAddressOperations { +// async fn is_batch_ready(&self) -> bool { +// let mut rpc = self.rpc_pool.get_connection().await.unwrap(); +// let is_batch_ready = { +// let mut account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); +// let merkle_tree = +// BatchedMerkleTreeAccount::address_tree_from_bytes_mut(account.data.as_mut_slice()) +// .unwrap(); +// let batch_index = merkle_tree +// .get_metadata() +// .queue_metadata +// .next_full_batch_index; +// let full_batch = merkle_tree.batches.get(batch_index as usize).unwrap(); +// +// info!("Batch state: {:?}", full_batch.get_state()); +// info!( +// "Current zkp batch index: {:?}", +// full_batch.get_current_zkp_batch_index() +// ); +// info!( +// "Num inserted zkps: {:?}", +// full_batch.get_num_inserted_zkps() +// ); +// +// full_batch.get_state() != BatchState::Inserted +// && full_batch.get_current_zkp_batch_index() > full_batch.get_num_inserted_zkps() +// }; +// is_batch_ready +// } +// +// pub async fn perform_batch_address_merkle_tree_update(&self) -> Result { +// info!("Performing batch address merkle tree update"); +// let mut rpc = self.rpc_pool.get_connection().await?; +// let (instruction_data, batch_size) = self +// .create_batch_update_address_tree_instruction_data_with_proof() +// .await?; +// +// let instruction = create_batch_update_address_tree_instruction( +// self.authority.pubkey(), +// self.derivation, +// self.merkle_tree, +// self.epoch, +// instruction_data.try_to_vec()?, +// ); +// let result = rpc +// .create_and_send_transaction( +// &[instruction], +// &self.authority.pubkey(), +// &[&self.authority], +// ) +// .await; +// match result { +// Ok(sig) => { +// info!("Batch address update sent with signature: {:?}", sig); +// self.finalize_batch_address_merkle_tree_update().await?; +// Ok(batch_size) +// } +// Err(e) => { +// info!("Failed to send batch address update: {:?}", e); +// Err(ForesterError::from(e)) +// } +// } +// } +// +// async fn finalize_batch_address_merkle_tree_update(&self) -> Result<()> { +// info!("Finalizing batch address merkle tree update"); +// let mut rpc = self.rpc_pool.get_connection().await?; +// self.indexer +// .lock() +// .await +// .finalize_batched_address_tree_update(&mut *rpc, self.merkle_tree) +// .await; +// +// Ok(()) +// } +// +// async fn create_batch_update_address_tree_instruction_data_with_proof( +// &self, +// ) -> Result<(InstructionDataBatchNullifyInputs, usize)> { +// let mut rpc = self.rpc_pool.get_connection().await?; +// +// let mut merkle_tree_account = rpc.get_account(self.merkle_tree).await?.unwrap(); +// +// let ( +// old_root_index, +// leaves_hashchain, +// start_index, +// current_root, +// batch_size, +// full_batch_index, +// ) = { +// let merkle_tree = BatchedMerkleTreeAccount::address_tree_from_bytes_mut( +// merkle_tree_account.data.as_mut_slice(), +// ) +// .unwrap(); +// +// let old_root_index = merkle_tree.root_history.last_index(); +// let full_batch_index = merkle_tree +// .get_metadata() +// .queue_metadata +// .next_full_batch_index; +// let batch = &merkle_tree.batches[full_batch_index as usize]; +// let zkp_batch_index = batch.get_num_inserted_zkps(); +// let leaves_hashchain = +// merkle_tree.hashchain_store[full_batch_index as usize][zkp_batch_index as usize]; +// let start_index = merkle_tree.get_metadata().next_index; +// let current_root = *merkle_tree.root_history.last().unwrap(); +// let batch_size = batch.zkp_batch_size as usize; +// +// ( +// old_root_index, +// leaves_hashchain, +// start_index, +// current_root, +// batch_size, +// full_batch_index, +// ) +// }; +// +// let batch_start_index = self +// .indexer +// .lock() +// .await +// .get_address_merkle_trees() +// .iter() +// .find(|x| x.accounts.merkle_tree == self.merkle_tree) +// .unwrap() +// .merkle_tree +// .merkle_tree +// .rightmost_index; +// +// let addresses = self +// .indexer +// .lock() +// .await +// .get_queue_elements( +// self.merkle_tree.to_bytes(), +// full_batch_index, +// 0, +// batch_size as u64, +// ) +// .await?; +// +// let batch_size = addresses.len(); +// +// // // local_leaves_hashchain is only used for a test assertion. +// // let local_nullifier_hashchain = create_hash_chain_from_array(&addresses); +// // assert_eq!(leaves_hashchain, local_nullifier_hashchain); +// +// // Get proof info after addresses are retrieved +// let non_inclusion_proofs = self +// .indexer +// .lock() +// .await +// .get_multiple_new_address_proofs_full(self.merkle_tree.to_bytes(), addresses.clone()) +// .await?; +// +// let mut low_element_values = Vec::new(); +// let mut low_element_indices = Vec::new(); +// let mut low_element_next_indices = Vec::new(); +// let mut low_element_next_values = Vec::new(); +// let mut low_element_proofs: Vec> = Vec::new(); +// +// for non_inclusion_proof in &non_inclusion_proofs { +// low_element_values.push(non_inclusion_proof.low_address_value); +// low_element_indices.push(non_inclusion_proof.low_address_index as usize); +// low_element_next_indices.push(non_inclusion_proof.low_address_next_index as usize); +// low_element_next_values.push(non_inclusion_proof.low_address_next_value); +// low_element_proofs.push(non_inclusion_proof.low_address_proof.to_vec()); +// } +// +// let subtrees = self +// .indexer +// .lock() +// .await +// .get_subtrees(self.merkle_tree.to_bytes()) +// .await? +// .try_into() +// .unwrap(); +// +// let inputs = get_batch_address_append_circuit_inputs::< +// { DEFAULT_BATCH_ADDRESS_TREE_HEIGHT as usize }, +// >( +// start_index as usize, +// current_root, +// low_element_values, +// low_element_next_values, +// low_element_indices, +// low_element_next_indices, +// low_element_proofs, +// addresses, +// subtrees, +// leaves_hashchain, +// batch_start_index, +// batch_size, +// ) +// .map_err(|e| { +// ForesterError::Custom(format!( +// "Can't create batch address append circuit inputs: {:?}", +// e.to_string() +// )) +// })?; +// +// let client = Client::new(); +// let circuit_inputs_new_root = bigint_to_be_bytes_array::<32>(&inputs.new_root).unwrap(); +// let inputs = to_json(&inputs); +// +// let response_result = client +// .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) +// .header("Content-Type", "text/plain; charset=utf-8") +// .body(inputs) +// .send() +// .await +// .expect("Failed to execute request."); +// +// if response_result.status().is_success() { +// let body = response_result.text().await.unwrap(); +// let proof_json = deserialize_gnark_proof_json(&body).unwrap(); +// let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); +// let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); +// let instruction_data = InstructionDataBatchNullifyInputs { +// public_inputs: BatchProofInputsIx { +// new_root: circuit_inputs_new_root, +// old_root_index: old_root_index as u16, +// }, +// compressed_proof: CompressedProof { +// a: proof_a, +// b: proof_b, +// c: proof_c, +// }, +// }; +// Ok((instruction_data, batch_size)) +// } else { +// Err(ForesterError::from(RpcError::CustomError( +// "Prover failed to generate proof".to_string(), +// ))) +// } +// } +// } +// +// pub async fn process_batched_address_operations>( +// rpc_pool: Arc>, +// indexer: Arc>, +// authority: Keypair, +// derivation: Pubkey, +// epoch: u64, +// merkle_tree: Pubkey, +// output_queue: Pubkey, +// ) -> Result { +// let ops = BatchedAddressOperations { +// rpc_pool, +// indexer, +// authority, +// derivation, +// epoch, +// merkle_tree, +// output_queue, +// }; +// +// info!("Processing batched address operations"); +// +// if ops.is_batch_ready().await { +// info!("Batch is ready"); +// let processed_count = ops.perform_batch_address_merkle_tree_update().await?; +// Ok(processed_count) +// } else { +// info!("Batch is not ready"); +// Ok(0) +// } +// } diff --git a/forester/src/batched_ops.rs b/forester/src/batched_ops.rs index a75d1558c..66bf89198 100644 --- a/forester/src/batched_ops.rs +++ b/forester/src/batched_ops.rs @@ -1,427 +1,427 @@ -use std::sync::Arc; - -use borsh::BorshSerialize; -use forester_utils::indexer::Indexer; -use light_batched_merkle_tree::{ - batch::BatchState, - constants::DEFAULT_BATCH_STATE_TREE_HEIGHT, - event::{BatchAppendEvent, BatchNullifyEvent}, - merkle_tree::{ - AppendBatchProofInputsIx, BatchProofInputsIx, BatchedMerkleTreeAccount, - InstructionDataBatchAppendInputs, InstructionDataBatchNullifyInputs, - }, - queue::BatchedQueueAccount, -}; -use light_client::{rpc::RpcConnection, rpc_pool::SolanaRpcPool}; -use light_hasher::{Hasher, Poseidon}; -use light_prover_client::{ - batch_append_with_proofs::get_batch_append_with_proofs_inputs, - batch_update::get_batch_update_inputs, - gnark::{ - batch_append_with_proofs_json_formatter::BatchAppendWithProofsInputsJson, - batch_update_json_formatter::update_inputs_string, - constants::{PROVE_PATH, SERVER_ADDRESS}, - proof_helpers::{compress_proof, deserialize_gnark_proof_json, proof_from_json_struct}, - }, -}; -use light_registry::account_compression_cpi::sdk::{ - create_batch_append_instruction, create_batch_nullify_instruction, -}; -use light_utils::bigint::bigint_to_be_bytes_array; -use light_verifier::CompressedProof; -use reqwest::Client; -use solana_program::pubkey::Pubkey; -use solana_sdk::{signature::Keypair, signer::Signer}; -use tokio::sync::Mutex; -use tracing::error; - -use crate::{errors::ForesterError, Result}; - -pub struct BatchedOperations> { - pub rpc_pool: Arc>, - pub indexer: Arc>, - pub authority: Keypair, - pub derivation: Pubkey, - pub epoch: u64, - pub merkle_tree: Pubkey, - pub output_queue: Pubkey, -} -impl> BatchedOperations { - async fn is_batch_ready(&self) -> bool { - let mut rpc = self.rpc_pool.get_connection().await.unwrap(); - let is_batch_ready = { - let mut account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); - let merkle_tree = - BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) - .unwrap(); - let batch_index = merkle_tree - .get_metadata() - .queue_metadata - .next_full_batch_index; - let full_batch = merkle_tree.batches.get(batch_index as usize).unwrap(); - - full_batch.get_state() != BatchState::Inserted - && full_batch.get_current_zkp_batch_index() > full_batch.get_num_inserted_zkps() - }; - is_batch_ready - } - - pub async fn perform_batch_append(&self) -> Result { - let mut rpc = self.rpc_pool.get_connection().await?; - - let (num_inserted_zkps, batch_size) = { - let mut output_queue_account = - rpc.get_account(self.output_queue).await.unwrap().unwrap(); - let output_queue = BatchedQueueAccount::output_queue_from_bytes_mut( - output_queue_account.data.as_mut_slice(), - ) - .unwrap(); - let queue_metadata = output_queue.get_metadata(); - let batch_index = queue_metadata.batch_metadata.next_full_batch_index; - let num_inserted_zkps = - output_queue.batches[batch_index as usize].get_num_inserted_zkps(); - let zkp_batch_size = queue_metadata.batch_metadata.zkp_batch_size; - - (num_inserted_zkps, zkp_batch_size) - }; - - let instruction_data = self.create_append_batch_ix_data().await; - let instruction = create_batch_append_instruction( - self.authority.pubkey(), - self.derivation, - self.merkle_tree, - self.output_queue, - self.epoch, - instruction_data?.try_to_vec()?, - ); - - rpc.create_and_send_transaction_with_event::( - &[instruction], - &self.authority.pubkey(), - &[&self.authority], - None, - ) - .await?; - - self.indexer - .lock() - .await - .update_test_indexer_after_append( - &mut rpc, - self.merkle_tree, - self.output_queue, - num_inserted_zkps, - ) - .await; - Ok(batch_size as usize) - } - - pub async fn perform_batch_nullify(&self) -> Result { - let mut rpc = self.rpc_pool.get_connection().await?; - - let instruction_data = self.get_batched_nullify_ix_data().await?; - - let instruction = create_batch_nullify_instruction( - self.authority.pubkey(), - self.derivation, - self.merkle_tree, - self.epoch, - instruction_data.try_to_vec()?, - ); - - rpc.create_and_send_transaction_with_event::( - &[instruction], - &self.authority.pubkey(), - &[&self.authority], - None, - ) - .await?; - - let (batch_index, batch_size) = { - let mut account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); - let merkle_tree = - BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) - .unwrap(); - ( - merkle_tree - .get_metadata() - .queue_metadata - .next_full_batch_index, - merkle_tree.get_metadata().queue_metadata.zkp_batch_size, - ) - }; - - self.indexer - .lock() - .await - .update_test_indexer_after_nullification( - &mut rpc, - self.merkle_tree, - batch_index as usize, - ) - .await; - Ok(batch_size as usize) - } - - async fn create_append_batch_ix_data(&self) -> Result { - let mut rpc = self.rpc_pool.get_connection().await.unwrap(); - - let (merkle_tree_next_index, current_root) = { - let mut merkle_tree_account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); - let merkle_tree = BatchedMerkleTreeAccount::state_tree_from_bytes_mut( - merkle_tree_account.data.as_mut_slice(), - ) - .unwrap(); - ( - merkle_tree.get_metadata().next_index, - *merkle_tree.root_history.last().unwrap(), - ) - }; - - let (zkp_batch_size, full_batch_index, num_inserted_zkps, leaves_hashchain) = { - let mut output_queue_account = - rpc.get_account(self.output_queue).await.unwrap().unwrap(); - let output_queue = BatchedQueueAccount::output_queue_from_bytes_mut( - output_queue_account.data.as_mut_slice(), - ) - .unwrap(); - - let queue_metadata = output_queue.get_metadata(); - let full_batch_index = queue_metadata.batch_metadata.next_full_batch_index; - let zkp_batch_size = queue_metadata.batch_metadata.zkp_batch_size; - - let num_inserted_zkps = - output_queue.batches[full_batch_index as usize].get_num_inserted_zkps(); - - let leaves_hashchain = - output_queue.hashchain_store[full_batch_index as usize][num_inserted_zkps as usize]; - - ( - zkp_batch_size, - full_batch_index, - num_inserted_zkps, - leaves_hashchain, - ) - }; - let start = num_inserted_zkps as usize * zkp_batch_size as usize; - let end = start + zkp_batch_size as usize; - - let leaves = self - .indexer - .lock() - .await - .get_queue_elements( - self.merkle_tree.to_bytes(), - full_batch_index, - start as u64, - end as u64, - ) - .await - .unwrap(); - - let (old_leaves, merkle_proofs) = { - let mut old_leaves = vec![]; - let mut merkle_proofs = vec![]; - let indices = (merkle_tree_next_index..merkle_tree_next_index + zkp_batch_size) - .collect::>(); - let proofs = self - .indexer - .lock() - .await - .get_proofs_by_indices(self.merkle_tree, &indices); - proofs.iter().for_each(|proof| { - old_leaves.push(proof.leaf); - merkle_proofs.push(proof.proof.clone()); - }); - - (old_leaves, merkle_proofs) - }; - - let (proof, new_root) = { - let circuit_inputs = get_batch_append_with_proofs_inputs::< - { DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }, - >( - current_root, - merkle_tree_next_index as u32, - leaves, - leaves_hashchain, - old_leaves, - merkle_proofs, - zkp_batch_size as u32, - ) - .unwrap(); - - let client = Client::new(); - let inputs_json = - BatchAppendWithProofsInputsJson::from_inputs(&circuit_inputs).to_string(); - - let response = client - .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) - .header("Content-Type", "text/plain; charset=utf-8") - .body(inputs_json) - .send() - .await - .expect("Failed to execute request."); - - if response.status().is_success() { - let body = response.text().await.unwrap(); - let proof_json = deserialize_gnark_proof_json(&body).unwrap(); - let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); - let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); - ( - CompressedProof { - a: proof_a, - b: proof_b, - c: proof_c, - }, - bigint_to_be_bytes_array::<32>(&circuit_inputs.new_root.to_biguint().unwrap()) - .unwrap(), - ) - } else { - error!( - "create_append_batch_ix_data: failed to get proof from server: {:?}", - response.text().await - ); - return Err(ForesterError::Custom( - "Failed to get proof from server".into(), - )); - } - }; - - Ok(InstructionDataBatchAppendInputs { - public_inputs: AppendBatchProofInputsIx { new_root }, - compressed_proof: proof, - }) - } - - async fn get_batched_nullify_ix_data(&self) -> Result { - let mut rpc = self.rpc_pool.get_connection().await.unwrap(); - - let (zkp_batch_size, old_root, old_root_index, leaves_hashchain) = { - let mut account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); - let merkle_tree = - BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) - .unwrap(); - let metadata = merkle_tree.get_metadata(); - let batch_idx = metadata.queue_metadata.next_full_batch_index as usize; - let zkp_size = metadata.queue_metadata.zkp_batch_size; - let batch = &merkle_tree.batches[batch_idx]; - let zkp_idx = batch.get_num_inserted_zkps(); - let hashchain = merkle_tree.hashchain_store[batch_idx][zkp_idx as usize]; - let root_idx = merkle_tree.root_history.last_index(); - let root = *merkle_tree.root_history.last().unwrap(); - (zkp_size, root, root_idx, hashchain) - }; - - let leaf_indices_tx_hashes = self - .indexer - .lock() - .await - .get_leaf_indices_tx_hashes(self.merkle_tree, zkp_batch_size as usize); - - let mut leaves = Vec::new(); - let mut tx_hashes = Vec::new(); - let mut old_leaves = Vec::new(); - let mut path_indices = Vec::new(); - let mut merkle_proofs = Vec::new(); - let mut nullifiers = Vec::new(); - - let proofs = self.indexer.lock().await.get_proofs_by_indices( - self.merkle_tree, - &leaf_indices_tx_hashes - .iter() - .map(|(index, _, _)| *index as u64) - .collect::>(), - ); - - for ((index, leaf, tx_hash), proof) in leaf_indices_tx_hashes.iter().zip(proofs.iter()) { - path_indices.push(*index); - leaves.push(*leaf); - old_leaves.push(proof.leaf); - merkle_proofs.push(proof.proof.clone()); - tx_hashes.push(*tx_hash); - let index_bytes = index.to_be_bytes(); - let nullifier = Poseidon::hashv(&[leaf, &index_bytes, tx_hash]).unwrap(); - nullifiers.push(nullifier); - } - - let inputs = get_batch_update_inputs::<{ DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }>( - old_root, - tx_hashes, - leaves.to_vec(), - leaves_hashchain, - old_leaves, - merkle_proofs, - path_indices, - zkp_batch_size as u32, - ) - .unwrap(); - - let new_root = - bigint_to_be_bytes_array::<32>(&inputs.new_root.to_biguint().unwrap()).unwrap(); - - let client = Client::new(); - let response = client - .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) - .header("Content-Type", "text/plain; charset=utf-8") - .body(update_inputs_string(&inputs)) - .send() - .await?; - - let proof = if response.status().is_success() { - let body = response.text().await.unwrap(); - let proof_json = deserialize_gnark_proof_json(&body).unwrap(); - let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); - let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); - CompressedProof { - a: proof_a, - b: proof_b, - c: proof_c, - } - } else { - error!( - "get_batched_nullify_ix_data: failed to get proof from server: {:?}", - response.text().await - ); - return Err(ForesterError::Custom( - "Failed to get proof from server".into(), - )); - }; - - Ok(InstructionDataBatchNullifyInputs { - public_inputs: BatchProofInputsIx { - new_root, - old_root_index: old_root_index as u16, - }, - compressed_proof: proof, - }) - } -} - -pub async fn process_batched_operations>( - rpc_pool: Arc>, - indexer: Arc>, - authority: Keypair, - derivation: Pubkey, - epoch: u64, - merkle_tree: Pubkey, - output_queue: Pubkey, -) -> Result { - let ops = BatchedOperations { - rpc_pool, - indexer, - authority, - derivation, - epoch, - merkle_tree, - output_queue, - }; - - if ops.is_batch_ready().await { - let processed_appends_count = ops.perform_batch_append().await?; - let processed_nullifications_count = ops.perform_batch_nullify().await?; - Ok(processed_appends_count + processed_nullifications_count) - } else { - Ok(0) - } -} +// use std::sync::Arc; +// +// use borsh::BorshSerialize; +// use forester_utils::indexer::Indexer; +// use light_batched_merkle_tree::{ +// batch::BatchState, +// constants::DEFAULT_BATCH_STATE_TREE_HEIGHT, +// event::{BatchAppendEvent, BatchNullifyEvent}, +// merkle_tree::{ +// AppendBatchProofInputsIx, BatchProofInputsIx, BatchedMerkleTreeAccount, +// InstructionDataBatchAppendInputs, InstructionDataBatchNullifyInputs, +// }, +// queue::BatchedQueueAccount, +// }; +// use light_client::{rpc::RpcConnection, rpc_pool::SolanaRpcPool}; +// use light_hasher::{Hasher, Poseidon}; +// use light_prover_client::{ +// batch_append_with_proofs::get_batch_append_with_proofs_inputs, +// batch_update::get_batch_update_inputs, +// gnark::{ +// batch_append_with_proofs_json_formatter::BatchAppendWithProofsInputsJson, +// batch_update_json_formatter::update_inputs_string, +// constants::{PROVE_PATH, SERVER_ADDRESS}, +// proof_helpers::{compress_proof, deserialize_gnark_proof_json, proof_from_json_struct}, +// }, +// }; +// use light_registry::account_compression_cpi::sdk::{ +// create_batch_append_instruction, create_batch_nullify_instruction, +// }; +// use light_utils::bigint::bigint_to_be_bytes_array; +// use light_verifier::CompressedProof; +// use reqwest::Client; +// use solana_program::pubkey::Pubkey; +// use solana_sdk::{signature::Keypair, signer::Signer}; +// use tokio::sync::Mutex; +// use tracing::error; +// +// use crate::{errors::ForesterError, Result}; +// +// pub struct BatchedOperations> { +// pub rpc_pool: Arc>, +// pub indexer: Arc>, +// pub authority: Keypair, +// pub derivation: Pubkey, +// pub epoch: u64, +// pub merkle_tree: Pubkey, +// pub output_queue: Pubkey, +// } +// impl> BatchedOperations { +// async fn is_batch_ready(&self) -> bool { +// let mut rpc = self.rpc_pool.get_connection().await.unwrap(); +// let is_batch_ready = { +// let mut account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); +// let merkle_tree = +// BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) +// .unwrap(); +// let batch_index = merkle_tree +// .get_metadata() +// .queue_metadata +// .next_full_batch_index; +// let full_batch = merkle_tree.batches.get(batch_index as usize).unwrap(); +// +// full_batch.get_state() != BatchState::Inserted +// && full_batch.get_current_zkp_batch_index() > full_batch.get_num_inserted_zkps() +// }; +// is_batch_ready +// } +// +// pub async fn perform_batch_append(&self) -> Result { +// let mut rpc = self.rpc_pool.get_connection().await?; +// +// let (num_inserted_zkps, batch_size) = { +// let mut output_queue_account = +// rpc.get_account(self.output_queue).await.unwrap().unwrap(); +// let output_queue = BatchedQueueAccount::output_queue_from_bytes_mut( +// output_queue_account.data.as_mut_slice(), +// ) +// .unwrap(); +// let queue_metadata = output_queue.get_metadata(); +// let batch_index = queue_metadata.batch_metadata.next_full_batch_index; +// let num_inserted_zkps = +// output_queue.batches[batch_index as usize].get_num_inserted_zkps(); +// let zkp_batch_size = queue_metadata.batch_metadata.zkp_batch_size; +// +// (num_inserted_zkps, zkp_batch_size) +// }; +// +// let instruction_data = self.create_append_batch_ix_data().await; +// let instruction = create_batch_append_instruction( +// self.authority.pubkey(), +// self.derivation, +// self.merkle_tree, +// self.output_queue, +// self.epoch, +// instruction_data?.try_to_vec()?, +// ); +// +// rpc.create_and_send_transaction_with_event::( +// &[instruction], +// &self.authority.pubkey(), +// &[&self.authority], +// None, +// ) +// .await?; +// +// self.indexer +// .lock() +// .await +// .update_test_indexer_after_append( +// &mut rpc, +// self.merkle_tree, +// self.output_queue, +// num_inserted_zkps, +// ) +// .await; +// Ok(batch_size as usize) +// } +// +// pub async fn perform_batch_nullify(&self) -> Result { +// let mut rpc = self.rpc_pool.get_connection().await?; +// +// let instruction_data = self.get_batched_nullify_ix_data().await?; +// +// let instruction = create_batch_nullify_instruction( +// self.authority.pubkey(), +// self.derivation, +// self.merkle_tree, +// self.epoch, +// instruction_data.try_to_vec()?, +// ); +// +// rpc.create_and_send_transaction_with_event::( +// &[instruction], +// &self.authority.pubkey(), +// &[&self.authority], +// None, +// ) +// .await?; +// +// let (batch_index, batch_size) = { +// let mut account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); +// let merkle_tree = +// BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) +// .unwrap(); +// ( +// merkle_tree +// .get_metadata() +// .queue_metadata +// .next_full_batch_index, +// merkle_tree.get_metadata().queue_metadata.zkp_batch_size, +// ) +// }; +// +// self.indexer +// .lock() +// .await +// .update_test_indexer_after_nullification( +// &mut rpc, +// self.merkle_tree, +// batch_index as usize, +// ) +// .await; +// Ok(batch_size as usize) +// } +// +// async fn create_append_batch_ix_data(&self) -> Result { +// let mut rpc = self.rpc_pool.get_connection().await.unwrap(); +// +// let (merkle_tree_next_index, current_root) = { +// let mut merkle_tree_account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); +// let merkle_tree = BatchedMerkleTreeAccount::state_tree_from_bytes_mut( +// merkle_tree_account.data.as_mut_slice(), +// ) +// .unwrap(); +// ( +// merkle_tree.get_metadata().next_index, +// *merkle_tree.root_history.last().unwrap(), +// ) +// }; +// +// let (zkp_batch_size, full_batch_index, num_inserted_zkps, leaves_hashchain) = { +// let mut output_queue_account = +// rpc.get_account(self.output_queue).await.unwrap().unwrap(); +// let output_queue = BatchedQueueAccount::output_queue_from_bytes_mut( +// output_queue_account.data.as_mut_slice(), +// ) +// .unwrap(); +// +// let queue_metadata = output_queue.get_metadata(); +// let full_batch_index = queue_metadata.batch_metadata.next_full_batch_index; +// let zkp_batch_size = queue_metadata.batch_metadata.zkp_batch_size; +// +// let num_inserted_zkps = +// output_queue.batches[full_batch_index as usize].get_num_inserted_zkps(); +// +// let leaves_hashchain = +// output_queue.hashchain_store[full_batch_index as usize][num_inserted_zkps as usize]; +// +// ( +// zkp_batch_size, +// full_batch_index, +// num_inserted_zkps, +// leaves_hashchain, +// ) +// }; +// let start = num_inserted_zkps as usize * zkp_batch_size as usize; +// let end = start + zkp_batch_size as usize; +// +// let leaves = self +// .indexer +// .lock() +// .await +// .get_queue_elements( +// self.merkle_tree.to_bytes(), +// full_batch_index, +// start as u64, +// end as u64, +// ) +// .await +// .unwrap(); +// +// let (old_leaves, merkle_proofs) = { +// let mut old_leaves = vec![]; +// let mut merkle_proofs = vec![]; +// let indices = (merkle_tree_next_index..merkle_tree_next_index + zkp_batch_size) +// .collect::>(); +// let proofs = self +// .indexer +// .lock() +// .await +// .get_proofs_by_indices(self.merkle_tree, &indices); +// proofs.iter().for_each(|proof| { +// old_leaves.push(proof.leaf); +// merkle_proofs.push(proof.proof.clone()); +// }); +// +// (old_leaves, merkle_proofs) +// }; +// +// let (proof, new_root) = { +// let circuit_inputs = get_batch_append_with_proofs_inputs::< +// { DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }, +// >( +// current_root, +// merkle_tree_next_index as u32, +// leaves, +// leaves_hashchain, +// old_leaves, +// merkle_proofs, +// zkp_batch_size as u32, +// ) +// .unwrap(); +// +// let client = Client::new(); +// let inputs_json = +// BatchAppendWithProofsInputsJson::from_inputs(&circuit_inputs).to_string(); +// +// let response = client +// .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) +// .header("Content-Type", "text/plain; charset=utf-8") +// .body(inputs_json) +// .send() +// .await +// .expect("Failed to execute request."); +// +// if response.status().is_success() { +// let body = response.text().await.unwrap(); +// let proof_json = deserialize_gnark_proof_json(&body).unwrap(); +// let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); +// let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); +// ( +// CompressedProof { +// a: proof_a, +// b: proof_b, +// c: proof_c, +// }, +// bigint_to_be_bytes_array::<32>(&circuit_inputs.new_root.to_biguint().unwrap()) +// .unwrap(), +// ) +// } else { +// error!( +// "create_append_batch_ix_data: failed to get proof from server: {:?}", +// response.text().await +// ); +// return Err(ForesterError::Custom( +// "Failed to get proof from server".into(), +// )); +// } +// }; +// +// Ok(InstructionDataBatchAppendInputs { +// public_inputs: AppendBatchProofInputsIx { new_root }, +// compressed_proof: proof, +// }) +// } +// +// async fn get_batched_nullify_ix_data(&self) -> Result { +// let mut rpc = self.rpc_pool.get_connection().await.unwrap(); +// +// let (zkp_batch_size, old_root, old_root_index, leaves_hashchain) = { +// let mut account = rpc.get_account(self.merkle_tree).await.unwrap().unwrap(); +// let merkle_tree = +// BatchedMerkleTreeAccount::state_tree_from_bytes_mut(account.data.as_mut_slice()) +// .unwrap(); +// let metadata = merkle_tree.get_metadata(); +// let batch_idx = metadata.queue_metadata.next_full_batch_index as usize; +// let zkp_size = metadata.queue_metadata.zkp_batch_size; +// let batch = &merkle_tree.batches[batch_idx]; +// let zkp_idx = batch.get_num_inserted_zkps(); +// let hashchain = merkle_tree.hashchain_store[batch_idx][zkp_idx as usize]; +// let root_idx = merkle_tree.root_history.last_index(); +// let root = *merkle_tree.root_history.last().unwrap(); +// (zkp_size, root, root_idx, hashchain) +// }; +// +// let leaf_indices_tx_hashes = self +// .indexer +// .lock() +// .await +// .get_leaf_indices_tx_hashes(self.merkle_tree, zkp_batch_size as usize); +// +// let mut leaves = Vec::new(); +// let mut tx_hashes = Vec::new(); +// let mut old_leaves = Vec::new(); +// let mut path_indices = Vec::new(); +// let mut merkle_proofs = Vec::new(); +// let mut nullifiers = Vec::new(); +// +// let proofs = self.indexer.lock().await.get_proofs_by_indices( +// self.merkle_tree, +// &leaf_indices_tx_hashes +// .iter() +// .map(|(index, _, _)| *index as u64) +// .collect::>(), +// ); +// +// for ((index, leaf, tx_hash), proof) in leaf_indices_tx_hashes.iter().zip(proofs.iter()) { +// path_indices.push(*index); +// leaves.push(*leaf); +// old_leaves.push(proof.leaf); +// merkle_proofs.push(proof.proof.clone()); +// tx_hashes.push(*tx_hash); +// let index_bytes = index.to_be_bytes(); +// let nullifier = Poseidon::hashv(&[leaf, &index_bytes, tx_hash]).unwrap(); +// nullifiers.push(nullifier); +// } +// +// let inputs = get_batch_update_inputs::<{ DEFAULT_BATCH_STATE_TREE_HEIGHT as usize }>( +// old_root, +// tx_hashes, +// leaves.to_vec(), +// leaves_hashchain, +// old_leaves, +// merkle_proofs, +// path_indices, +// zkp_batch_size as u32, +// ) +// .unwrap(); +// +// let new_root = +// bigint_to_be_bytes_array::<32>(&inputs.new_root.to_biguint().unwrap()).unwrap(); +// +// let client = Client::new(); +// let response = client +// .post(format!("{}{}", SERVER_ADDRESS, PROVE_PATH)) +// .header("Content-Type", "text/plain; charset=utf-8") +// .body(update_inputs_string(&inputs)) +// .send() +// .await?; +// +// let proof = if response.status().is_success() { +// let body = response.text().await.unwrap(); +// let proof_json = deserialize_gnark_proof_json(&body).unwrap(); +// let (proof_a, proof_b, proof_c) = proof_from_json_struct(proof_json); +// let (proof_a, proof_b, proof_c) = compress_proof(&proof_a, &proof_b, &proof_c); +// CompressedProof { +// a: proof_a, +// b: proof_b, +// c: proof_c, +// } +// } else { +// error!( +// "get_batched_nullify_ix_data: failed to get proof from server: {:?}", +// response.text().await +// ); +// return Err(ForesterError::Custom( +// "Failed to get proof from server".into(), +// )); +// }; +// +// Ok(InstructionDataBatchNullifyInputs { +// public_inputs: BatchProofInputsIx { +// new_root, +// old_root_index: old_root_index as u16, +// }, +// compressed_proof: proof, +// }) +// } +// } +// +// pub async fn process_batched_operations>( +// rpc_pool: Arc>, +// indexer: Arc>, +// authority: Keypair, +// derivation: Pubkey, +// epoch: u64, +// merkle_tree: Pubkey, +// output_queue: Pubkey, +// ) -> Result { +// let ops = BatchedOperations { +// rpc_pool, +// indexer, +// authority, +// derivation, +// epoch, +// merkle_tree, +// output_queue, +// }; +// +// if ops.is_batch_ready().await { +// let processed_appends_count = ops.perform_batch_append().await?; +// let processed_nullifications_count = ops.perform_batch_nullify().await?; +// Ok(processed_appends_count + processed_nullifications_count) +// } else { +// Ok(0) +// } +// } diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index e342073e1..8f0cac6a8 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -34,7 +34,6 @@ use tokio::{ use tracing::{debug, error, info, info_span, instrument, warn}; use crate::{ - batched_ops::process_batched_operations, errors::ForesterError, metrics::{push_metrics, queue_metric_update, update_forester_sol_balance}, pagerduty::send_pagerduty_alert, @@ -51,6 +50,7 @@ use crate::{ tree_finder::TreeFinder, ForesterConfig, ForesterEpochInfo, Result, }; +use crate::batch_processor::{process_batched_operations, BatchContext}; #[derive(Clone, Debug)] pub struct WorkReport { @@ -717,6 +717,7 @@ impl> EpochManager { let slot = rpc.get_slot().await?; let trees = self.trees.lock().await; + info!("Adding schedule for trees: {:?}", *trees); epoch_info.add_trees_with_schedule(&trees, slot); info!("Finished waiting for active phase"); Ok(epoch_info) @@ -747,12 +748,15 @@ impl> EpochManager { let mut handles: Vec>> = Vec::new(); - debug!( + info!( "Creating threads for tree processing. Trees: {:?}", epoch_info.trees ); for tree in epoch_info.trees.iter() { - info!("Creating thread for queue {}", tree.tree_accounts.queue); + info!( + "Creating thread for tree {}", + tree.tree_accounts.merkle_tree + ); let self_clone = self_arc.clone(); let epoch_info_clone = epoch_info_arc.clone(); let tree = tree.clone(); @@ -811,8 +815,8 @@ impl> EpochManager { epoch_pda: &ForesterEpochPda, mut tree: TreeForesterSchedule, ) -> Result<()> { - debug!("enter process_queue"); - debug!("Tree schedule slots: {:?}", tree.slots); + info!("enter process_queue"); + info!("Tree schedule slots: {:?}", tree.slots); // TODO: sync at some point let mut estimated_slot = self.slot_tracker.estimated_current_slot(); @@ -830,7 +834,11 @@ impl> EpochManager { .find(|(_, slot)| slot.is_some()); if let Some((index, forester_slot)) = index_and_forester_slot { - debug!("Found eligible slot"); + info!( + "Found eligible slot, index: {}, tree: {}", + index, + tree.tree_accounts.merkle_tree.to_string() + ); let forester_slot = forester_slot.as_ref().unwrap().clone(); tree.slots.remove(index); @@ -854,32 +862,45 @@ impl> EpochManager { })? }; - if tree.tree_accounts.tree_type == TreeType::BatchedState { + if tree.tree_accounts.tree_type == TreeType::BatchedState || + tree.tree_accounts.tree_type == TreeType::BatchedAddress { + + info!("Processing batched operations for tree {:?} ({:?})", + tree.tree_accounts.tree_type, + tree.tree_accounts.merkle_tree); + + let batch_context = BatchContext { + rpc_pool: self.rpc_pool.clone(), + indexer: self.indexer.clone(), + authority: self.config.payer_keypair.insecure_clone(), + derivation: self.config.derivation_pubkey, + epoch: epoch_info.epoch, + merkle_tree: tree.tree_accounts.merkle_tree, + output_queue: tree.tree_accounts.queue, + }; + let start_time = Instant::now(); - info!("Processing batched state operations"); - - let rpc_pool = self.rpc_pool.clone(); - let indexer = self.indexer.clone(); - let payer = self.config.payer_keypair.insecure_clone(); - let derivation = self.config.derivation_pubkey; - let merkle_tree = tree.tree_accounts.merkle_tree; - let queue = tree.tree_accounts.queue; - - // TODO: measure & spawn child task for processing batched state operations - let processed_count = process_batched_operations( - rpc_pool, - indexer, - payer, - derivation, - epoch_info.epoch, - merkle_tree, - queue, - ) - .await?; - info!("Processed {} batched state operations", processed_count); - queue_metric_update(epoch_info.epoch, 1, start_time.elapsed()).await; - self.increment_processed_items_count(epoch_info.epoch, processed_count) - .await; + + match process_batched_operations(batch_context, tree.tree_accounts.tree_type).await { + Ok(processed_count) => { + info!( + "Processed {} operations for tree type {:?}", + processed_count, + tree.tree_accounts.tree_type + ); + queue_metric_update(epoch_info.epoch, processed_count, start_time.elapsed()) + .await; + self.increment_processed_items_count(epoch_info.epoch, processed_count) + .await; + } + Err(e) => { + error!( + "Failed to process batched operations for tree {:?}: {:?}", + tree.tree_accounts.merkle_tree, e + ); + return Err(e.into()); + } + } } else { // TODO: measure accuracy // Optional replace with shutdown signal for all child processes diff --git a/forester/src/forester_status.rs b/forester/src/forester_status.rs index 60622e787..28adbc9c0 100644 --- a/forester/src/forester_status.rs +++ b/forester/src/forester_status.rs @@ -164,6 +164,7 @@ pub async fn fetch_forester_status(args: &StatusArgs) { TreeType::State => "State", TreeType::Address => "Address", TreeType::BatchedState => "BatchedState", + TreeType::BatchedAddress => "BatchedAddress", } ); let tree_info = get_tree_fullness(&mut rpc, tree.merkle_tree, tree.tree_type) diff --git a/forester/src/lib.rs b/forester/src/lib.rs index 4c7c88880..a0a8d91ef 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -1,5 +1,4 @@ -pub type Result = std::result::Result; - +pub mod batched_address_ops; pub mod batched_ops; pub mod cli; pub mod config; @@ -18,6 +17,8 @@ pub mod telemetry; pub mod tree_data_sync; pub mod tree_finder; pub mod utils; +pub mod batch_operations; +mod batch_processor; use std::{sync::Arc, time::Duration}; diff --git a/forester/src/rollover/operations.rs b/forester/src/rollover/operations.rs index d9647dcd5..f3bfd5e70 100644 --- a/forester/src/rollover/operations.rs +++ b/forester/src/rollover/operations.rs @@ -167,6 +167,68 @@ pub async fn get_tree_fullness( threshold, }) } + + TreeType::BatchedAddress => { + let mut account = rpc.get_account(tree_pubkey).await?.unwrap(); + let merkle_tree = + BatchedMerkleTreeAccount::state_tree_from_bytes_mut(&mut account.data).unwrap(); + println!( + "merkle_tree.get_account().queue.batch_size: {:?}", + merkle_tree.get_metadata().queue_metadata.batch_size + ); + + println!( + "queue currently_processing_batch_index: {:?}", + merkle_tree + .get_metadata() + .queue_metadata + .currently_processing_batch_index as usize + ); + + println!( + "queue batch_size: {:?}", + merkle_tree.get_metadata().queue_metadata.batch_size + ); + println!( + "queue zkp_batch_size: {:?}", + merkle_tree.get_metadata().queue_metadata.zkp_batch_size + ); + println!( + "queue next_full_batch_index: {:?}", + merkle_tree + .get_metadata() + .queue_metadata + .next_full_batch_index + ); + println!( + "queue bloom_filter_capacity: {:?}", + merkle_tree + .get_metadata() + .queue_metadata + .bloom_filter_capacity + ); + println!( + "queue num_batches: {:?}", + merkle_tree.get_metadata().queue_metadata.num_batches + ); + + println!( + "tree next_index: {:?}", + merkle_tree.get_metadata().next_index + ); + println!("tree height: {:?}", merkle_tree.get_metadata().height); + + // TODO: implement + let threshold = 0; + let next_index = 0; + let fullness = 0.0; + + Ok(TreeInfo { + fullness, + next_index, + threshold, + }) + } } } diff --git a/forester/src/tree_data_sync.rs b/forester/src/tree_data_sync.rs index 941aab2c2..c08ea1050 100644 --- a/forester/src/tree_data_sync.rs +++ b/forester/src/tree_data_sync.rs @@ -27,6 +27,7 @@ fn process_account(pubkey: Pubkey, mut account: Account) -> Option process_state_account(&account, pubkey) .or_else(|_| process_batch_state_account(&mut account, pubkey)) .or_else(|_| process_address_account(&account, pubkey)) + .or_else(|_| process_batch_address_account(&mut account, pubkey)) .ok() } @@ -40,6 +41,16 @@ fn process_state_account(account: &Account, pubkey: Pubkey) -> Result Result { + check_discriminator::(&account.data)?; + let tree_account = AddressMerkleTreeAccount::deserialize(&mut &account.data[8..])?; + Ok(create_tree_accounts( + pubkey, + &tree_account.metadata, + TreeType::Address, + )) +} + fn process_batch_state_account(account: &mut Account, pubkey: Pubkey) -> Result { let tree_account = BatchedMerkleTreeAccount::state_tree_from_bytes_mut(&mut account.data) .map_err(|e| { @@ -52,13 +63,18 @@ fn process_batch_state_account(account: &mut Account, pubkey: Pubkey) -> Result< )) } -fn process_address_account(account: &Account, pubkey: Pubkey) -> Result { - check_discriminator::(&account.data)?; - let tree_account = AddressMerkleTreeAccount::deserialize(&mut &account.data[8..])?; +fn process_batch_address_account(account: &mut Account, pubkey: Pubkey) -> Result { + let tree_account = BatchedMerkleTreeAccount::address_tree_from_bytes_mut(&mut account.data) + .map_err(|e| { + ForesterError::Custom(format!( + "Failed to deserialize address tree account: {:?}", + e + )) + })?; Ok(create_tree_accounts( pubkey, - &tree_account.metadata, - TreeType::Address, + &tree_account.get_metadata().metadata, + TreeType::BatchedAddress, )) } diff --git a/forester/tests/batched_address_test.rs b/forester/tests/batched_address_test.rs new file mode 100644 index 000000000..b110a51bd --- /dev/null +++ b/forester/tests/batched_address_test.rs @@ -0,0 +1,255 @@ +use std::{sync::Arc, time::Duration}; + +use forester::run_pipeline; +use forester_utils::{ + indexer::AddressMerkleTreeAccounts, + registry::{register_test_forester, update_test_forester}, +}; +use light_batched_merkle_tree::{ + initialize_address_tree::InitAddressTreeAccountsInstructionData, + merkle_tree::BatchedMerkleTreeAccount, +}; +use light_client::{ + rpc::{solana_rpc::SolanaRpcUrl, RpcConnection, SolanaRpcConnection}, + rpc_pool::SolanaRpcPool, +}; +use light_program_test::test_env::EnvAccounts; +use light_prover_client::gnark::helpers::{LightValidatorConfig, ProverConfig, ProverMode}; +use light_test_utils::{ + create_address_test_program_sdk::perform_create_pda_with_event_rnd, e2e_test_env::E2ETestEnv, + indexer::TestIndexer, +}; +use solana_program::native_token::LAMPORTS_PER_SOL; +use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair, signer::Signer}; +use tokio::{ + sync::{mpsc, oneshot, Mutex}, + time::{sleep, timeout}, +}; +use tracing::log::info; + +use crate::test_utils::{forester_config, general_action_config, init, keypair_action_config}; + +mod test_utils; + +#[tokio::test(flavor = "multi_thread", worker_threads = 32)] +async fn test_address_batched() { + init(Some(LightValidatorConfig { + enable_indexer: false, + wait_time: 15, + prover_config: Some(ProverConfig { + run_mode: Some(ProverMode::ForesterTest), + circuits: vec![], + }), + sbf_programs: vec![( + "FNt7byTHev1k5x2cXZLBr8TdWiC3zoP5vcnZR4P682Uy".to_string(), + "../target/deploy/create_address_test_program.so".to_string(), + )], + })) + .await; + + let tree_params = InitAddressTreeAccountsInstructionData::test_default(); + + let forester_keypair = Keypair::new(); + let mut env_accounts = EnvAccounts::get_local_test_validator_accounts(); + env_accounts.forester = forester_keypair.insecure_clone(); + + let mut config = forester_config(); + config.payer_keypair = forester_keypair.insecure_clone(); + + let pool = SolanaRpcPool::::new( + config.external_services.rpc_url.to_string(), + CommitmentConfig::processed(), + config.general_config.rpc_pool_size as u32, + ) + .await + .unwrap(); + + let commitment_config = CommitmentConfig::confirmed(); + let mut rpc = SolanaRpcConnection::new(SolanaRpcUrl::Localnet, Some(commitment_config)); + rpc.payer = forester_keypair.insecure_clone(); + + rpc.airdrop_lamports(&forester_keypair.pubkey(), LAMPORTS_PER_SOL * 100_000) + .await + .unwrap(); + + rpc.airdrop_lamports( + &env_accounts.governance_authority.pubkey(), + LAMPORTS_PER_SOL * 100_000, + ) + .await + .unwrap(); + + register_test_forester( + &mut rpc, + &env_accounts.governance_authority, + &forester_keypair.pubkey(), + light_registry::ForesterConfig::default(), + ) + .await + .unwrap(); + + let new_forester_keypair = Keypair::new(); + rpc.airdrop_lamports(&new_forester_keypair.pubkey(), LAMPORTS_PER_SOL * 100_000) + .await + .unwrap(); + + update_test_forester( + &mut rpc, + &forester_keypair, + &forester_keypair.pubkey(), + Some(&new_forester_keypair), + light_registry::ForesterConfig::default(), + ) + .await + .unwrap(); + + config.derivation_pubkey = forester_keypair.pubkey(); + config.payer_keypair = new_forester_keypair.insecure_clone(); + + let config = Arc::new(config); + + let indexer: TestIndexer = + TestIndexer::init_from_env(&config.payer_keypair, &env_accounts, None).await; + + let mut env = E2ETestEnv::>::new( + rpc, + indexer, + &env_accounts, + keypair_action_config(), + general_action_config(), + 0, + Some(0), + ) + .await; + + let address_trees: Vec = env + .indexer + .address_merkle_trees + .iter() + .map(|x| x.accounts) + .collect(); + + println!("Address trees: {:?}", address_trees); + for tree in address_trees { + let is_v2 = tree.merkle_tree == tree.queue; + println!("Tree {:?} is_v2: {}", tree, is_v2); + } + + println!("Removing trees..."); + env.indexer.address_merkle_trees.clear(); + + println!("Creating new address batch tree..."); + { + let new_merkle_tree = Keypair::new(); + // let test_tree_params = InitAddressTreeAccountsInstructionData::default(); + // // test_tree_params.network_fee = Some(1); + // let result = + // create_batch_address_merkle_tree(&mut env.rpc, &env.payer, &new_merkle_tree, test_tree_params) + // .await; + env.indexer + .add_address_merkle_tree(&mut env.rpc, &new_merkle_tree, &new_merkle_tree, None, 2) + .await; + env_accounts.batch_address_merkle_tree = new_merkle_tree.pubkey(); + } + + let address_trees: Vec = env + .indexer + .address_merkle_trees + .iter() + .map(|x| x.accounts) + .collect(); + + println!("New address trees: {:?}", address_trees); + for tree in address_trees { + let is_v2 = tree.merkle_tree == tree.queue; + println!("Tree {:?} is_v2: {}", tree, is_v2); + } + + for i in 0..50 { + println!("===================== tx {} =====================", i); + // env.create_address(None, Some(0)).await; + + perform_create_pda_with_event_rnd( + &mut env.indexer, + &mut env.rpc, + &env_accounts, + &env.payer, + ) + .await + .unwrap(); + + sleep(Duration::from_millis(100)).await; + } + + let merkle_tree_pubkey = env.indexer.address_merkle_trees[0].accounts.merkle_tree; + + let zkp_batches = tree_params.input_queue_batch_size / tree_params.input_queue_zkp_batch_size; + + println!("zkp_batches: {}", zkp_batches); + + let pre_root = { + let mut rpc = pool.get_connection().await.unwrap(); + let mut merkle_tree_account = rpc.get_account(merkle_tree_pubkey).await.unwrap().unwrap(); + + let merkle_tree = BatchedMerkleTreeAccount::address_tree_from_bytes_mut( + merkle_tree_account.data.as_mut_slice(), + ) + .unwrap(); + merkle_tree.get_root().unwrap() + }; + + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + let (work_report_sender, mut work_report_receiver) = mpsc::channel(100); + + let service_handle = tokio::spawn(run_pipeline( + Arc::from(config.clone()), + Arc::new(Mutex::new(env.indexer)), + shutdown_receiver, + work_report_sender, + )); + + let timeout_duration = Duration::from_secs(60 * 10); + match timeout(timeout_duration, work_report_receiver.recv()).await { + Ok(Some(report)) => { + info!("Received work report: {:?}", report); + assert!(report.processed_items > 0, "No items were processed"); + } + Ok(None) => panic!("Work report channel closed unexpectedly"), + Err(_) => panic!("Test timed out after {:?}", timeout_duration), + } + + let mut rpc = pool.get_connection().await.unwrap(); + let mut merkle_tree_account = rpc.get_account(merkle_tree_pubkey).await.unwrap().unwrap(); + + let merkle_tree = BatchedMerkleTreeAccount::address_tree_from_bytes_mut( + merkle_tree_account.data.as_mut_slice(), + ) + .unwrap(); + + assert!( + merkle_tree + .get_metadata() + .queue_metadata + .next_full_batch_index + > 0, + "No batches were processed" + ); + + let post_root = { + let mut rpc = pool.get_connection().await.unwrap(); + let mut merkle_tree_account = rpc.get_account(merkle_tree_pubkey).await.unwrap().unwrap(); + + let merkle_tree = BatchedMerkleTreeAccount::address_tree_from_bytes_mut( + merkle_tree_account.data.as_mut_slice(), + ) + .unwrap(); + merkle_tree.get_root().unwrap() + }; + + assert_ne!(pre_root, post_root, "Roots are the same"); + + shutdown_sender + .send(()) + .expect("Failed to send shutdown signal"); + service_handle.await.unwrap().unwrap(); +} diff --git a/forester/tests/batched_ops_test.rs b/forester/tests/batched_state_test.rs similarity index 99% rename from forester/tests/batched_ops_test.rs rename to forester/tests/batched_state_test.rs index c3a93a27a..59c68f2de 100644 --- a/forester/tests/batched_ops_test.rs +++ b/forester/tests/batched_state_test.rs @@ -31,7 +31,7 @@ use crate::test_utils::{forester_config, init}; mod test_utils; #[tokio::test(flavor = "multi_thread", worker_threads = 32)] -async fn test_batched() { +async fn test_state_batched() { let devnet = false; let tree_params = if devnet { InitStateTreeAccountsInstructionData::default() @@ -43,6 +43,7 @@ async fn test_batched() { enable_indexer: false, wait_time: 15, prover_config: None, + sbf_programs: vec![], })) .await; diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index a8357821b..f30045dd1 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -51,6 +51,7 @@ async fn test_epoch_monitor_with_test_indexer_and_1_forester() { enable_indexer: false, wait_time: 10, prover_config: None, + sbf_programs: vec![], })) .await; @@ -293,6 +294,7 @@ async fn test_epoch_monitor_with_2_foresters() { enable_indexer: false, wait_time: 40, prover_config: None, + sbf_programs: vec![], })) .await; let forester_keypair1 = Keypair::new(); @@ -641,6 +643,7 @@ async fn test_epoch_double_registration() { enable_indexer: false, wait_time: 10, prover_config: None, + sbf_programs: vec![], })) .await; diff --git a/program-tests/utils/src/create_address_test_program_sdk.rs b/program-tests/utils/src/create_address_test_program_sdk.rs index 55de65797..b9b962a51 100644 --- a/program-tests/utils/src/create_address_test_program_sdk.rs +++ b/program-tests/utils/src/create_address_test_program_sdk.rs @@ -132,7 +132,11 @@ pub async fn perform_create_pda_with_event( registered_program_pda: &env.registered_program_pda, }; let instruction = create_pda_instruction(create_ix_inputs); - let pre_test_indexer_queue_len = test_indexer.address_merkle_trees[1].queue_elements.len(); + let pre_test_indexer_queue_len = test_indexer + .get_address_merkle_tree(env.batch_address_merkle_tree) + .unwrap() + .queue_elements + .len(); let event = rpc .create_and_send_transaction_with_event(&[instruction], &payer.pubkey(), &[payer], None) .await? @@ -140,7 +144,11 @@ pub async fn perform_create_pda_with_event( let slot: u64 = rpc.get_slot().await.unwrap(); test_indexer.add_compressed_accounts_with_token_data(slot, &event.0); assert_eq!( - test_indexer.address_merkle_trees[1].queue_elements.len(), + test_indexer + .get_address_merkle_tree(env.batch_address_merkle_tree) + .unwrap() + .queue_elements + .len(), pre_test_indexer_queue_len + 1 ); Ok(()) diff --git a/program-tests/utils/src/indexer/test_indexer.rs b/program-tests/utils/src/indexer/test_indexer.rs index 1f9b45d31..a8b9a4868 100644 --- a/program-tests/utils/src/indexer/test_indexer.rs +++ b/program-tests/utils/src/indexer/test_indexer.rs @@ -90,7 +90,8 @@ use solana_sdk::{ signer::Signer, }; use spl_token::instruction::initialize_mint; - +use light_batched_merkle_tree::initialize_address_tree::InitAddressTreeAccountsInstructionData; +use light_program_test::test_batch_forester::create_batch_address_merkle_tree; use crate::{ create_address_merkle_tree_and_queue_account_with_assert, e2e_test_env::KeypairActionConfig, spl::create_initialize_mint_instructions, @@ -142,6 +143,60 @@ impl Indexer for TestIndexer { Err(IndexerError::Custom("Merkle tree not found".to_string())) } + fn get_proof_by_index(&mut self, merkle_tree_pubkey: Pubkey, index: u64) -> ProofOfLeaf { + let mut bundle = self + .state_merkle_trees + .iter_mut() + .find(|x| x.accounts.merkle_tree == merkle_tree_pubkey) + .unwrap(); + + while bundle.merkle_tree.leaves().len() <= index as usize { + bundle.merkle_tree.append(&[0u8; 32]).unwrap(); + } + + let leaf = match bundle.merkle_tree.get_leaf(index as usize) { + Ok(leaf) => leaf, + Err(_) => { + bundle.merkle_tree.append(&[0u8; 32]).unwrap(); + bundle.merkle_tree.get_leaf(index as usize).unwrap() + } + }; + + let proof = bundle + .merkle_tree + .get_proof_of_leaf(index as usize, true) + .unwrap() + .to_vec(); + + ProofOfLeaf { leaf, proof } + } + + fn get_proofs_by_indices( + &mut self, + merkle_tree_pubkey: Pubkey, + indices: &[u64], + ) -> Vec { + indices + .iter() + .map(|&index| self.get_proof_by_index(merkle_tree_pubkey, index)) + .collect() + } + + /// leaf index, leaf, tx hash + fn get_leaf_indices_tx_hashes( + &mut self, + merkle_tree_pubkey: Pubkey, + zkp_batch_size: usize, + ) -> Vec<(u32, [u8; 32], [u8; 32])> { + let mut state_merkle_tree_bundle = self + .state_merkle_trees + .iter_mut() + .find(|x| x.accounts.merkle_tree == merkle_tree_pubkey) + .unwrap(); + + state_merkle_tree_bundle.input_leaf_indices[..zkp_batch_size].to_vec() + } + async fn get_subtrees( &self, merkle_tree_pubkey: [u8; 32], @@ -346,121 +401,6 @@ impl Indexer for TestIndexer { &self.group_pda } - /// leaf index, leaf, tx hash - fn get_leaf_indices_tx_hashes( - &mut self, - merkle_tree_pubkey: Pubkey, - zkp_batch_size: usize, - ) -> Vec<(u32, [u8; 32], [u8; 32])> { - let mut state_merkle_tree_bundle = self - .state_merkle_trees - .iter_mut() - .find(|x| x.accounts.merkle_tree == merkle_tree_pubkey) - .unwrap(); - - state_merkle_tree_bundle.input_leaf_indices[..zkp_batch_size].to_vec() - } - - async fn create_proof_for_compressed_accounts2( - &mut self, - compressed_accounts: Option>, - state_merkle_tree_pubkeys: Option>, - new_addresses: Option<&[[u8; 32]]>, - address_merkle_tree_pubkeys: Option>, - rpc: &mut R, - ) -> BatchedTreeProofRpcResult { - let mut indices_to_remove = Vec::new(); - - // for all accounts in batched trees, check whether values are in tree or queue - let (compressed_accounts, state_merkle_tree_pubkeys) = - if let Some((compressed_accounts, state_merkle_tree_pubkeys)) = - compressed_accounts.zip(state_merkle_tree_pubkeys) - { - for (i, (compressed_account, state_merkle_tree_pubkey)) in compressed_accounts - .iter() - .zip(state_merkle_tree_pubkeys.iter()) - .enumerate() - { - let accounts = self.state_merkle_trees.iter().find(|x| { - x.accounts.merkle_tree == *state_merkle_tree_pubkey && x.version == 2 - }); - if let Some(accounts) = accounts { - let output_queue_pubkey = accounts.accounts.nullifier_queue; - let mut queue = - AccountZeroCopy::::new(rpc, output_queue_pubkey) - .await; - let queue_zero_copy = BatchedQueueAccount::output_queue_from_bytes_mut( - queue.account.data.as_mut_slice(), - ) - .unwrap(); - for value_array in queue_zero_copy.value_vecs.iter() { - let index = value_array.iter().position(|x| *x == *compressed_account); - if index.is_some() { - indices_to_remove.push(i); - } - } - } - } - let compress_accounts = compressed_accounts - .iter() - .enumerate() - .filter(|(i, _)| !indices_to_remove.contains(i)) - .map(|(_, x)| *x) - .collect::>(); - let state_merkle_tree_pubkeys = state_merkle_tree_pubkeys - .iter() - .enumerate() - .filter(|(i, _)| !indices_to_remove.contains(i)) - .map(|(_, x)| *x) - .collect::>(); - if compress_accounts.is_empty() { - (None, None) - } else { - (Some(compress_accounts), Some(state_merkle_tree_pubkeys)) - } - } else { - (None, None) - }; - let rpc_result = if (compressed_accounts.is_some() - && !compressed_accounts.as_ref().unwrap().is_empty()) - || address_merkle_tree_pubkeys.is_some() - { - Some( - self.create_proof_for_compressed_accounts( - compressed_accounts, - state_merkle_tree_pubkeys, - new_addresses, - address_merkle_tree_pubkeys, - rpc, - ) - .await, - ) - } else { - None - }; - let address_root_indices = if let Some(rpc_result) = rpc_result.as_ref() { - rpc_result.address_root_indices.clone() - } else { - Vec::new() - }; - let root_indices = { - let mut root_indices = if let Some(rpc_result) = rpc_result.as_ref() { - rpc_result.root_indices.clone() - } else { - Vec::new() - }; - for index in indices_to_remove { - root_indices.insert(index, None); - } - root_indices - }; - BatchedTreeProofRpcResult { - proof: rpc_result.map(|x| x.proof), - root_indices, - address_root_indices, - } - } - async fn create_proof_for_compressed_accounts( &mut self, compressed_accounts: Option>, @@ -642,6 +582,106 @@ impl Indexer for TestIndexer { panic!("Failed to get proof from server"); } + async fn create_proof_for_compressed_accounts2( + &mut self, + compressed_accounts: Option>, + state_merkle_tree_pubkeys: Option>, + new_addresses: Option<&[[u8; 32]]>, + address_merkle_tree_pubkeys: Option>, + rpc: &mut R, + ) -> BatchedTreeProofRpcResult { + let mut indices_to_remove = Vec::new(); + + // for all accounts in batched trees, check whether values are in tree or queue + let (compressed_accounts, state_merkle_tree_pubkeys) = + if let Some((compressed_accounts, state_merkle_tree_pubkeys)) = + compressed_accounts.zip(state_merkle_tree_pubkeys) + { + for (i, (compressed_account, state_merkle_tree_pubkey)) in compressed_accounts + .iter() + .zip(state_merkle_tree_pubkeys.iter()) + .enumerate() + { + let accounts = self.state_merkle_trees.iter().find(|x| { + x.accounts.merkle_tree == *state_merkle_tree_pubkey && x.version == 2 + }); + if let Some(accounts) = accounts { + let output_queue_pubkey = accounts.accounts.nullifier_queue; + let mut queue = + AccountZeroCopy::::new(rpc, output_queue_pubkey) + .await; + let queue_zero_copy = BatchedQueueAccount::output_queue_from_bytes_mut( + queue.account.data.as_mut_slice(), + ) + .unwrap(); + for value_array in queue_zero_copy.value_vecs.iter() { + let index = value_array.iter().position(|x| *x == *compressed_account); + if index.is_some() { + indices_to_remove.push(i); + } + } + } + } + let compress_accounts = compressed_accounts + .iter() + .enumerate() + .filter(|(i, _)| !indices_to_remove.contains(i)) + .map(|(_, x)| *x) + .collect::>(); + let state_merkle_tree_pubkeys = state_merkle_tree_pubkeys + .iter() + .enumerate() + .filter(|(i, _)| !indices_to_remove.contains(i)) + .map(|(_, x)| *x) + .collect::>(); + if compress_accounts.is_empty() { + (None, None) + } else { + (Some(compress_accounts), Some(state_merkle_tree_pubkeys)) + } + } else { + (None, None) + }; + let rpc_result = if (compressed_accounts.is_some() + && !compressed_accounts.as_ref().unwrap().is_empty()) + || address_merkle_tree_pubkeys.is_some() + { + Some( + self.create_proof_for_compressed_accounts( + compressed_accounts, + state_merkle_tree_pubkeys, + new_addresses, + address_merkle_tree_pubkeys, + rpc, + ) + .await, + ) + } else { + None + }; + let address_root_indices = if let Some(rpc_result) = rpc_result.as_ref() { + rpc_result.address_root_indices.clone() + } else { + Vec::new() + }; + let root_indices = { + let mut root_indices = if let Some(rpc_result) = rpc_result.as_ref() { + rpc_result.root_indices.clone() + } else { + Vec::new() + }; + for index in indices_to_remove { + root_indices.insert(index, None); + } + root_indices + }; + BatchedTreeProofRpcResult { + proof: rpc_result.map(|x| x.proof), + root_indices, + address_root_indices, + } + } + fn add_address_merkle_tree_accounts( &mut self, merkle_tree_keypair: &Keypair, @@ -804,43 +844,42 @@ impl Indexer for TestIndexer { } } - fn get_proofs_by_indices( + async fn finalize_batched_address_tree_update( &mut self, + rpc: &mut R, merkle_tree_pubkey: Pubkey, - indices: &[u64], - ) -> Vec { - indices - .iter() - .map(|&index| self.get_proof_by_index(merkle_tree_pubkey, index)) - .collect() - } - - fn get_proof_by_index(&mut self, merkle_tree_pubkey: Pubkey, index: u64) -> ProofOfLeaf { - let mut bundle = self - .state_merkle_trees + ) { + let mut account = rpc.get_account(merkle_tree_pubkey).await.unwrap().unwrap(); + let onchain_account = + BatchedMerkleTreeAccount::address_tree_from_bytes_mut(account.data.as_mut_slice()) + .unwrap(); + let address_tree = self + .address_merkle_trees .iter_mut() .find(|x| x.accounts.merkle_tree == merkle_tree_pubkey) .unwrap(); + let address_tree_index = address_tree.merkle_tree.merkle_tree.rightmost_index; + let onchain_next_index = onchain_account.get_metadata().next_index; + let diff_onchain_indexer = onchain_next_index - address_tree_index as u64; + let addresses = address_tree.queue_elements[0..diff_onchain_indexer as usize].to_vec(); - while bundle.merkle_tree.leaves().len() <= index as usize { - bundle.merkle_tree.append(&[0u8; 32]).unwrap(); + for _ in 0..diff_onchain_indexer { + address_tree.queue_elements.remove(0); + } + for new_element_value in &addresses { + address_tree + .merkle_tree + .append( + &BigUint::from_bytes_be(new_element_value), + &mut address_tree.indexed_array, + ) + .unwrap(); } - let leaf = match bundle.merkle_tree.get_leaf(index as usize) { - Ok(leaf) => leaf, - Err(_) => { - bundle.merkle_tree.append(&[0u8; 32]).unwrap(); - bundle.merkle_tree.get_leaf(index as usize).unwrap() - } - }; - - let proof = bundle - .merkle_tree - .get_proof_of_leaf(index as usize, true) - .unwrap() - .to_vec(); - - ProofOfLeaf { leaf, proof } + let onchain_root = onchain_account.root_history.last().unwrap(); + let new_root = address_tree.merkle_tree.root(); + assert_eq!(*onchain_root, new_root); + println!("finalized batched address tree update"); } } @@ -1051,7 +1090,7 @@ impl TestIndexer { } } - pub async fn add_address_merkle_tree( + async fn add_address_merkle_tree_v1( &mut self, rpc: &mut R, merkle_tree_keypair: &Keypair, @@ -1070,11 +1109,73 @@ impl TestIndexer { &AddressQueueConfig::default(), 0, ) - .await - .unwrap(); + .await + .unwrap(); self.add_address_merkle_tree_accounts(merkle_tree_keypair, queue_keypair, owning_program_id) } + async fn add_address_merkle_tree_v2( + &mut self, + rpc: &mut R, + merkle_tree_keypair: &Keypair, + queue_keypair: &Keypair, + owning_program_id: Option, + ) -> AddressMerkleTreeAccounts { + info!( + "Adding address merkle tree accounts v2 {:?}", + merkle_tree_keypair.pubkey() + ); + + let params = InitAddressTreeAccountsInstructionData::test_default(); + + info!( + "Creating batched address merkle tree {:?}", + merkle_tree_keypair.pubkey() + ); + create_batch_address_merkle_tree(rpc, &self.payer, merkle_tree_keypair, params) + .await + .unwrap(); + info!( + "Batched address merkle tree created {:?}", + merkle_tree_keypair.pubkey() + ); + + self.add_address_merkle_tree_accounts(merkle_tree_keypair, queue_keypair, owning_program_id) + } + + pub async fn add_address_merkle_tree( + &mut self, + rpc: &mut R, + merkle_tree_keypair: &Keypair, + queue_keypair: &Keypair, + owning_program_id: Option, + version: u64, + ) -> AddressMerkleTreeAccounts { + if version == 1 { + self.add_address_merkle_tree_v1( + rpc, + merkle_tree_keypair, + queue_keypair, + owning_program_id, + ) + .await + } else if version == 2 { + self.add_address_merkle_tree_v2( + rpc, + merkle_tree_keypair, + queue_keypair, + owning_program_id, + ) + .await + } else { + panic!( + "add_address_merkle_tree: Version not supported, {}. Versions: 1, 2", + version + ) + } + } + + #[allow(clippy::too_many_arguments)] pub async fn add_state_merkle_tree( &mut self, @@ -1666,37 +1767,13 @@ impl TestIndexer { } } - pub fn finalize_batched_address_tree_update( - &mut self, + pub(crate) fn get_address_merkle_tree( + &self, merkle_tree_pubkey: Pubkey, - onchain_account: &BatchedMerkleTreeAccount, - ) { - let address_tree = self - .address_merkle_trees - .iter_mut() + ) -> Option<&AddressMerkleTreeBundle> { + self.address_merkle_trees + .iter() .find(|x| x.accounts.merkle_tree == merkle_tree_pubkey) - .unwrap(); - let address_tree_index = address_tree.merkle_tree.merkle_tree.rightmost_index; - let onchain_next_index = onchain_account.get_metadata().next_index; - let diff_onchain_indexer = onchain_next_index - address_tree_index as u64; - let addresses = address_tree.queue_elements[0..diff_onchain_indexer as usize].to_vec(); - - for _ in 0..diff_onchain_indexer { - address_tree.queue_elements.remove(0); - } - for new_element_value in &addresses { - address_tree - .merkle_tree - .append( - &BigUint::from_bytes_be(new_element_value), - &mut address_tree.indexed_array, - ) - .unwrap(); - } - - let onchain_root = onchain_account.root_history.last().unwrap(); - let new_root = address_tree.merkle_tree.root(); - assert_eq!(*onchain_root, new_root); - println!("finalized batched address tree update"); } + } diff --git a/prover/client/src/gnark/helpers.rs b/prover/client/src/gnark/helpers.rs index dd6516b7c..3658c082b 100644 --- a/prover/client/src/gnark/helpers.rs +++ b/prover/client/src/gnark/helpers.rs @@ -272,6 +272,7 @@ pub struct LightValidatorConfig { pub enable_indexer: bool, pub prover_config: Option, pub wait_time: u64, + pub sbf_programs: Vec<(String, String)>, } impl Default for LightValidatorConfig { @@ -280,6 +281,7 @@ impl Default for LightValidatorConfig { enable_indexer: false, prover_config: None, wait_time: 35, + sbf_programs: vec![], } } } @@ -291,6 +293,14 @@ pub async fn spawn_validator(config: LightValidatorConfig) { if !config.enable_indexer { path.push_str(" --skip-indexer"); } + + for sbf_program in config.sbf_programs.iter() { + path.push_str(&format!( + " --sbf-program {} {}", + sbf_program.0, sbf_program.1 + )); + } + if let Some(prover_config) = config.prover_config { prover_config.circuits.iter().for_each(|circuit| { path.push_str(&format!(" --circuit {}", circuit));