From 1bcee8c1d20f98211af1447a5514e1b782223a6c Mon Sep 17 00:00:00 2001 From: jeffhelius Date: Wed, 4 Sep 2024 17:44:47 +0000 Subject: [PATCH] add async changes to accounts --- rpc/src/parsed_token_accounts.rs | 14 +- rpc/src/rpc.rs | 385 +++++++++++++++++++------------ rpc/src/rpc_subscriptions.rs | 2 +- 3 files changed, 241 insertions(+), 160 deletions(-) diff --git a/rpc/src/parsed_token_accounts.rs b/rpc/src/parsed_token_accounts.rs index d93cda521e65bd..5ffa1b7c2e1894 100644 --- a/rpc/src/parsed_token_accounts.rs +++ b/rpc/src/parsed_token_accounts.rs @@ -19,19 +19,11 @@ pub fn get_parsed_token_account( bank: &Bank, pubkey: &Pubkey, account: AccountSharedData, - // only used for simulation results - overwrite_accounts: Option<&HashMap>, ) -> UiAccount { let additional_data = get_token_account_mint(account.data()) - .and_then(|mint_pubkey| { - account_resolver::get_account_from_overwrites_or_bank( - &mint_pubkey, - bank, - overwrite_accounts, - ) - }) - .map(|mint_account| AccountAdditionalData { - spl_token_decimals: get_mint_decimals(mint_account.data()).ok(), + .and_then(|mint_pubkey| get_mint_owner_and_decimals(bank, &mint_pubkey).ok()) + .map(|(_, decimals)| AccountAdditionalData { + spl_token_decimals: Some(decimals), }); UiAccount::encode( diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 2cf27c779ae0a3..e85dc23d409f87 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -455,9 +455,9 @@ impl JsonRpcRequestProcessor { } } - pub fn get_account_info( + pub async fn get_account_info( &self, - pubkey: &Pubkey, + pubkey: Pubkey, config: Option, ) -> Result>> { let RpcAccountInfoConfig { @@ -472,11 +472,18 @@ impl JsonRpcRequestProcessor { })?; let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); - let response = get_encoded_account(&bank, pubkey, encoding, data_slice, None)?; + let response = self + .runtime + .spawn_blocking({ + let bank = Arc::clone(&bank); + move || get_encoded_account(&bank, &pubkey, encoding, data_slice) + }) + .await + .expect("rpc: get_encoded_account panicked")?; Ok(new_response(&bank, response)) } - pub fn get_multiple_accounts( + pub async fn get_multiple_accounts( &self, pubkeys: Vec, config: Option, @@ -493,10 +500,18 @@ impl JsonRpcRequestProcessor { })?; let encoding = encoding.unwrap_or(UiAccountEncoding::Base64); - let accounts = pubkeys - .into_iter() - .map(|pubkey| get_encoded_account(&bank, &pubkey, encoding, data_slice, None)) - .collect::>>()?; + let mut accounts = Vec::with_capacity(pubkeys.len()); + for pubkey in pubkeys { + let bank = Arc::clone(&bank); + accounts.push( + self.runtime + .spawn_blocking(move || { + get_encoded_account(&bank, &pubkey, encoding, data_slice) + }) + .await + .expect("rpc: get_encoded_account panicked")?, + ); + } Ok(new_response(&bank, accounts)) } @@ -509,9 +524,9 @@ impl JsonRpcRequestProcessor { .get_minimum_balance_for_rent_exemption(data_len) } - pub fn get_program_accounts( + pub async fn get_program_accounts( &self, - program_id: &Pubkey, + program_id: Pubkey, config: Option, mut filters: Vec, with_context: bool, @@ -529,18 +544,31 @@ impl JsonRpcRequestProcessor { let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); optimize_filters(&mut filters); let keyed_accounts = { - if let Some(owner) = get_spl_token_owner_filter(program_id, &filters) { - self.get_filtered_spl_token_accounts_by_owner(&bank, program_id, &owner, filters)? - } else if let Some(mint) = get_spl_token_mint_filter(program_id, &filters) { - self.get_filtered_spl_token_accounts_by_mint(&bank, program_id, &mint, filters)? + if let Some(owner) = get_spl_token_owner_filter(&program_id, &filters) { + self.get_filtered_spl_token_accounts_by_owner( + Arc::clone(&bank), + program_id, + owner, + filters, + ) + .await? + } else if let Some(mint) = get_spl_token_mint_filter(&program_id, &filters) { + self.get_filtered_spl_token_accounts_by_mint( + Arc::clone(&bank), + program_id, + mint, + filters, + ) + .await? } else { - self.get_filtered_program_accounts(&bank, program_id, filters)? + self.get_filtered_program_accounts(Arc::clone(&bank), program_id, filters) + .await? } }; - let accounts = if is_known_spl_token_id(program_id) + let accounts = if is_known_spl_token_id(&program_id) && encoding == UiAccountEncoding::JsonParsed { - get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect() + get_parsed_token_accounts(Arc::clone(&bank), keyed_accounts.into_iter()).collect() } else { keyed_accounts .into_iter() @@ -1883,20 +1911,21 @@ impl JsonRpcRequestProcessor { Ok(new_response(&bank, supply)) } - pub fn get_token_largest_accounts( + pub async fn get_token_largest_accounts( &self, - mint: &Pubkey, + mint: Pubkey, commitment: Option, ) -> Result>> { let bank = self.bank(commitment); - let (mint_owner, decimals) = get_mint_owner_and_decimals(&bank, mint)?; + let (mint_owner, decimals) = get_mint_owner_and_decimals(&bank, &mint)?; if !is_known_spl_token_id(&mint_owner) { return Err(Error::invalid_params( "Invalid param: not a Token mint".to_string(), )); } let mut token_balances: Vec = self - .get_filtered_spl_token_accounts_by_mint(&bank, &mint_owner, mint, vec![])? + .get_filtered_spl_token_accounts_by_mint(Arc::clone(&bank), mint_owner, mint, vec![]) + .await? .into_iter() .map(|(address, account)| { let amount = StateWithExtensions::::unpack(account.data()) @@ -1921,9 +1950,9 @@ impl JsonRpcRequestProcessor { Ok(new_response(&bank, token_balances)) } - pub fn get_token_accounts_by_owner( + pub async fn get_token_accounts_by_owner( &self, - owner: &Pubkey, + owner: Pubkey, token_account_filter: TokenAccountsFilter, config: Option, ) -> Result>> { @@ -1949,12 +1978,14 @@ impl JsonRpcRequestProcessor { ))); } - let keyed_accounts = self.get_filtered_spl_token_accounts_by_owner( - &bank, - &token_program_id, - owner, - filters, - )?; + let keyed_accounts = self + .get_filtered_spl_token_accounts_by_owner( + Arc::clone(&bank), + token_program_id, + owner, + filters, + ) + .await?; let accounts = if encoding == UiAccountEncoding::JsonParsed { get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect() } else { @@ -1971,9 +2002,9 @@ impl JsonRpcRequestProcessor { Ok(new_response(&bank, accounts)) } - pub fn get_token_accounts_by_delegate( + pub async fn get_token_accounts_by_delegate( &self, - delegate: &Pubkey, + delegate: Pubkey, token_account_filter: TokenAccountsFilter, config: Option, ) -> Result>> { @@ -2001,11 +2032,18 @@ impl JsonRpcRequestProcessor { ]; // Optional filter on Mint address, uses mint account index for scan let keyed_accounts = if let Some(mint) = mint { - self.get_filtered_spl_token_accounts_by_mint(&bank, &token_program_id, &mint, filters)? + self.get_filtered_spl_token_accounts_by_mint( + Arc::clone(&bank), + token_program_id, + mint, + filters, + ) + .await? } else { // Filter on Token Account state filters.push(RpcFilterType::TokenAccountState); - self.get_filtered_program_accounts(&bank, &token_program_id, filters)? + self.get_filtered_program_accounts(Arc::clone(&bank), token_program_id, filters) + .await? }; let accounts = if encoding == UiAccountEncoding::JsonParsed { get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect() @@ -2024,14 +2062,14 @@ impl JsonRpcRequestProcessor { } /// Use a set of filters to get an iterator of keyed program accounts from a bank - fn get_filtered_program_accounts( + async fn get_filtered_program_accounts( &self, - bank: &Bank, - program_id: &Pubkey, + bank: Arc, + program_id: Pubkey, mut filters: Vec, ) -> RpcCustomResult> { optimize_filters(&mut filters); - let filter_closure = |account: &AccountSharedData| { + let filter_closure = move |account: &AccountSharedData| { filters .iter() .all(|filter_type| filter_type.allows(account)) @@ -2041,44 +2079,56 @@ impl JsonRpcRequestProcessor { .account_indexes .contains(&AccountIndex::ProgramId) { - if !self.config.account_indexes.include_key(program_id) { + if !self.config.account_indexes.include_key(&program_id) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: program_id.to_string(), }); } - Ok(bank - .get_filtered_indexed_accounts( - &IndexKey::ProgramId(*program_id), - |account| { - // The program-id account index checks for Account owner on inclusion. However, due - // to the current AccountsDb implementation, an account may remain in storage as a - // zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later - // updates. We include the redundant filters here to avoid returning these - // accounts. - account.owner() == program_id && filter_closure(account) - }, - &ScanConfig::default(), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.runtime + .spawn_blocking(move || { + bank.get_filtered_indexed_accounts( + &IndexKey::ProgramId(program_id), + |account| { + // The program-id account index checks for Account owner on inclusion. However, due + // to the current AccountsDb implementation, an account may remain in storage as a + // zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later + // updates. We include the redundant filters here to avoid returning these + // accounts. + account.owner() == &program_id && filter_closure(account) + }, + &ScanConfig::default(), + bank.byte_limit_for_scans(), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) + }) + .await + .expect("Failed to spawn blocking task") } else { // this path does not need to provide a mb limit because we only want to support secondary indexes - Ok(bank - .get_filtered_program_accounts(program_id, filter_closure, &ScanConfig::default()) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.runtime + .spawn_blocking(move || { + bank.get_filtered_program_accounts( + &program_id, + filter_closure, + &ScanConfig::default(), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) + }) + .await + .expect("Failed to spawn blocking task") } } /// Get an iterator of spl-token accounts by owner address - fn get_filtered_spl_token_accounts_by_owner( + async fn get_filtered_spl_token_accounts_by_owner( &self, - bank: &Bank, - program_id: &Pubkey, - owner_key: &Pubkey, + bank: Arc, + program_id: Pubkey, + owner_key: Pubkey, mut filters: Vec, ) -> RpcCustomResult> { // The by-owner accounts index checks for Token Account state and Owner address on @@ -2099,37 +2149,42 @@ impl JsonRpcRequestProcessor { .account_indexes .contains(&AccountIndex::SplTokenOwner) { - if !self.config.account_indexes.include_key(owner_key) { + if !self.config.account_indexes.include_key(&owner_key) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: owner_key.to_string(), }); } - Ok(bank - .get_filtered_indexed_accounts( - &IndexKey::SplTokenOwner(*owner_key), - |account| { - account.owner() == program_id - && filters - .iter() - .all(|filter_type| filter_type.allows(account)) - }, - &ScanConfig::default(), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.runtime + .spawn_blocking(move || { + bank.get_filtered_indexed_accounts( + &IndexKey::SplTokenOwner(owner_key), + |account| { + account.owner() == &program_id + && filters + .iter() + .all(|filter_type| filter_type.allows(account)) + }, + &ScanConfig::default(), + bank.byte_limit_for_scans(), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) + }) + .await + .expect("rpc: get_filtered_indexed_account panicked") } else { self.get_filtered_program_accounts(bank, program_id, filters) + .await } } /// Get an iterator of spl-token accounts by mint address - fn get_filtered_spl_token_accounts_by_mint( + async fn get_filtered_spl_token_accounts_by_mint( &self, - bank: &Bank, - program_id: &Pubkey, - mint_key: &Pubkey, + bank: Arc, + program_id: Pubkey, + mint_key: Pubkey, mut filters: Vec, ) -> RpcCustomResult> { // The by-mint accounts index checks for Token Account state and Mint address on inclusion. @@ -2149,28 +2204,33 @@ impl JsonRpcRequestProcessor { .account_indexes .contains(&AccountIndex::SplTokenMint) { - if !self.config.account_indexes.include_key(mint_key) { + if !self.config.account_indexes.include_key(&mint_key) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: mint_key.to_string(), }); } - Ok(bank - .get_filtered_indexed_accounts( - &IndexKey::SplTokenMint(*mint_key), - |account| { - account.owner() == program_id - && filters - .iter() - .all(|filter_type| filter_type.allows(account)) - }, - &ScanConfig::default(), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.runtime + .spawn_blocking(move || { + bank.get_filtered_indexed_accounts( + &IndexKey::SplTokenMint(mint_key), + |account| { + account.owner() == &program_id + && filters + .iter() + .all(|filter_type| filter_type.allows(account)) + }, + &ScanConfig::default(), + bank.byte_limit_for_scans(), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) + }) + .await + .expect("rpc: get_filtered_indexed_account panicked") } else { self.get_filtered_program_accounts(bank, program_id, filters) + .await } } @@ -2323,15 +2383,13 @@ fn get_encoded_account( pubkey: &Pubkey, encoding: UiAccountEncoding, data_slice: Option, - // only used for simulation results - overwrite_accounts: Option<&HashMap>, ) -> Result> { - match account_resolver::get_account_from_overwrites_or_bank(pubkey, bank, overwrite_accounts) { + match bank.get_account(pubkey) { Some(account) => { let response = if is_known_spl_token_id(account.owner()) && encoding == UiAccountEncoding::JsonParsed { - get_parsed_token_account(bank, pubkey, account, overwrite_accounts) + get_parsed_token_account(bank, pubkey, account) } else { encode_account(&account, pubkey, encoding, data_slice)? }; @@ -2999,7 +3057,7 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_str: String, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getMultipleAccounts")] fn get_multiple_accounts( @@ -3007,7 +3065,7 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_strs: Vec, config: Option, - ) -> Result>>>; + ) -> BoxFuture>>>>; #[rpc(meta, name = "getBlockCommitment")] fn get_block_commitment( @@ -3046,10 +3104,13 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_str: String, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!("get_account_info rpc request received: {:?}", pubkey_str); - let pubkey = verify_pubkey(&pubkey_str)?; - meta.get_account_info(&pubkey, config) + let pubkey = match verify_pubkey(&pubkey_str) { + Ok(pubkey) => pubkey, + Err(e) => return Box::pin(future::err(e)), + }; + Box::pin(async move { meta.get_account_info(pubkey, config).await }) } fn get_multiple_accounts( @@ -3057,7 +3118,7 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_strs: Vec, config: Option, - ) -> Result>>> { + ) -> BoxFuture>>>> { debug!( "get_multiple_accounts rpc request received: {:?}", pubkey_strs.len() @@ -3068,15 +3129,19 @@ pub mod rpc_accounts { .max_multiple_accounts .unwrap_or(MAX_MULTIPLE_ACCOUNTS); if pubkey_strs.len() > max_multiple_accounts { - return Err(Error::invalid_params(format!( + return Box::pin(future::err(Error::invalid_params(format!( "Too many inputs provided; max {max_multiple_accounts}" - ))); + )))); } let pubkeys = pubkey_strs .into_iter() .map(|pubkey_str| verify_pubkey(&pubkey_str)) - .collect::>>()?; - meta.get_multiple_accounts(pubkeys, config) + .collect::>>(); + let pubkeys = match pubkeys { + Ok(pubkeys) => pubkeys, + Err(err) => return Box::pin(future::err(err)), + }; + Box::pin(async move { meta.get_multiple_accounts(pubkeys, config).await }) } fn get_block_commitment( @@ -3130,7 +3195,7 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, program_id_str: String, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getLargestAccounts")] fn get_largest_accounts( @@ -3156,7 +3221,7 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, mint_str: String, commitment: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getTokenAccountsByOwner")] fn get_token_accounts_by_owner( @@ -3165,7 +3230,7 @@ pub mod rpc_accounts_scan { owner_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getTokenAccountsByDelegate")] fn get_token_accounts_by_delegate( @@ -3174,7 +3239,7 @@ pub mod rpc_accounts_scan { delegate_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; } pub struct AccountsScanImpl; @@ -3186,12 +3251,15 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, program_id_str: String, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_program_accounts rpc request received: {:?}", program_id_str ); - let program_id = verify_pubkey(&program_id_str)?; + let program_id = match verify_pubkey(&program_id_str) { + Ok(program_id) => program_id, + Err(e) => return Box::pin(future::err(e)), + }; let (config, filters, with_context) = if let Some(config) = config { ( Some(config.account_config), @@ -3202,14 +3270,19 @@ pub mod rpc_accounts_scan { (None, vec![], false) }; if filters.len() > MAX_GET_PROGRAM_ACCOUNT_FILTERS { - return Err(Error::invalid_params(format!( + return Box::pin(future::err(Error::invalid_params(format!( "Too many filters provided; max {MAX_GET_PROGRAM_ACCOUNT_FILTERS}" - ))); + )))); } for filter in &filters { - verify_filter(filter)?; + if let Err(e) = verify_filter(filter) { + return Box::pin(future::err(e)); + } } - meta.get_program_accounts(&program_id, config, filters, with_context) + Box::pin(async move { + meta.get_program_accounts(program_id, config, filters, with_context) + .await + }) } fn get_largest_accounts( @@ -3235,13 +3308,16 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, mint_str: String, commitment: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_token_largest_accounts rpc request received: {:?}", mint_str ); - let mint = verify_pubkey(&mint_str)?; - meta.get_token_largest_accounts(&mint, commitment) + let mint = match verify_pubkey(&mint_str) { + Ok(mint) => mint, + Err(e) => return Box::pin(future::err(e)), + }; + Box::pin(async move { meta.get_token_largest_accounts(mint, commitment).await }) } fn get_token_accounts_by_owner( @@ -3250,14 +3326,23 @@ pub mod rpc_accounts_scan { owner_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_token_accounts_by_owner rpc request received: {:?}", owner_str ); - let owner = verify_pubkey(&owner_str)?; - let token_account_filter = verify_token_account_filter(token_account_filter)?; - meta.get_token_accounts_by_owner(&owner, token_account_filter, config) + let owner = match verify_pubkey(&owner_str) { + Ok(owner) => owner, + Err(e) => return Box::pin(future::err(e)), + }; + let token_account_filter = match verify_token_account_filter(token_account_filter) { + Ok(token_account_filter) => token_account_filter, + Err(e) => return Box::pin(future::err(e)), + }; + Box::pin(async move { + meta.get_token_accounts_by_owner(owner, token_account_filter, config) + .await + }) } fn get_token_accounts_by_delegate( @@ -3266,14 +3351,23 @@ pub mod rpc_accounts_scan { delegate_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_token_accounts_by_delegate rpc request received: {:?}", delegate_str ); - let delegate = verify_pubkey(&delegate_str)?; - let token_account_filter = verify_token_account_filter(token_account_filter)?; - meta.get_token_accounts_by_delegate(&delegate, token_account_filter, config) + let delegate = match verify_pubkey(&delegate_str) { + Ok(delegate) => delegate, + Err(e) => return Box::pin(future::err(e)), + }; + let token_account_filter = match verify_token_account_filter(token_account_filter) { + Ok(token_account_filter) => token_account_filter, + Err(e) => return Box::pin(future::err(e)), + }; + Box::pin(async move { + meta.get_token_accounts_by_delegate(delegate, token_account_filter, config) + .await + }) } } } @@ -3813,24 +3907,19 @@ pub mod rpc_full { if result.is_err() { Some(vec![None; config_accounts.addresses.len()]) } else { - let mut post_simulation_accounts_map = HashMap::new(); - for (pubkey, data) in post_simulation_accounts { - post_simulation_accounts_map.insert(pubkey, data); - } - Some( config_accounts .addresses .iter() .map(|address_str| { - let pubkey = verify_pubkey(address_str)?; - get_encoded_account( - bank, - &pubkey, - accounts_encoding, - None, - Some(&post_simulation_accounts_map), - ) + let address = verify_pubkey(address_str)?; + post_simulation_accounts + .iter() + .find(|(key, _account)| key == &address) + .map(|(pubkey, account)| { + encode_account(account, pubkey, accounts_encoding, None) + }) + .transpose() }) .collect::>>()?, ) diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index fa833f2179cc58..4f2a81ae39b174 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -385,7 +385,7 @@ fn filter_account_result( if is_known_spl_token_id(account.owner()) && params.encoding == UiAccountEncoding::JsonParsed { - get_parsed_token_account(&bank, ¶ms.pubkey, account, None) + get_parsed_token_account(&bank, ¶ms.pubkey, account) } else { UiAccount::encode(¶ms.pubkey, &account, params.encoding, None, None) }