From e023719c5814bed4aec4f375337a4bdb2d08f691 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 1 Jun 2020 22:45:51 -0700 Subject: [PATCH] Add preflight checks to sendTransaction RPC method (bp #10338) (#10362) automerge --- cli/src/cli.rs | 119 ++++++++++++++++++++-- cli/src/cluster_query.rs | 12 +-- cli/src/display.rs | 10 ++ client/src/rpc_client.rs | 104 +++++-------------- client/src/rpc_config.rs | 8 +- core/src/lib.rs | 1 + core/src/rpc.rs | 190 +++++++++++++++++++++++++++++++---- core/src/rpc_error.rs | 9 ++ core/src/rpc_health.rs | 120 ++++++++++++++++++++++ core/src/rpc_service.rs | 126 ++++++----------------- docs/src/apps/jsonrpc-api.md | 11 +- stake-monitor/src/lib.rs | 93 ++++++++++------- tokens/src/thin_client.rs | 179 +++++++++++++++++++++++++++++++++ 13 files changed, 734 insertions(+), 248 deletions(-) create mode 100644 core/src/rpc_health.rs create mode 100644 tokens/src/thin_client.rs diff --git a/cli/src/cli.rs b/cli/src/cli.rs index e7bda5db27901a..91b5dad92c961d 100644 --- a/cli/src/cli.rs +++ b/cli/src/cli.rs @@ -1,7 +1,7 @@ use crate::{ cli_output::{CliAccount, CliSignOnlyData, CliSignature, OutputFormat}, cluster_query::*, - display::println_name_value, + display::{new_spinner_progress_bar, println_name_value, println_transaction}, nonce::{self, *}, offline::{blockhash_query::BlockhashQuery, *}, stake::*, @@ -26,7 +26,7 @@ use solana_clap_utils::{ use solana_client::{ client_error::{ClientErrorKind, Result as ClientResult}, rpc_client::RpcClient, - rpc_config::RpcLargestAccountsFilter, + rpc_config::{RpcLargestAccountsFilter, RpcSendTransactionConfig}, rpc_response::{RpcAccount, RpcKeyedAccount}, }; #[cfg(not(test))] @@ -36,7 +36,7 @@ use solana_faucet::faucet_mock::request_airdrop_transaction; use solana_remote_wallet::remote_wallet::RemoteWalletManager; use solana_sdk::{ bpf_loader, - clock::{Epoch, Slot}, + clock::{Epoch, Slot, DEFAULT_TICKS_PER_SECOND}, commitment_config::CommitmentConfig, fee_calculator::FeeCalculator, hash::Hash, @@ -47,6 +47,7 @@ use solana_sdk::{ program_utils::DecodeError, pubkey::{Pubkey, MAX_SEED_LEN}, signature::{Keypair, Signature, Signer, SignerError}, + signers::Signers, system_instruction::{self, SystemError}, system_program, transaction::{Transaction, TransactionError}, @@ -1231,7 +1232,7 @@ fn process_confirm( "\nTransaction executed in slot {}:", confirmed_transaction.slot ); - crate::display::println_transaction( + println_transaction( &confirmed_transaction .transaction .transaction @@ -1261,7 +1262,7 @@ fn process_confirm( } fn process_decode_transaction(transaction: &Transaction) -> ProcessResult { - crate::display::println_transaction(transaction, &None, ""); + println_transaction(transaction, &None, ""); Ok("".to_string()) } @@ -1299,6 +1300,103 @@ fn process_show_account( Ok(account_string) } +fn send_and_confirm_transactions_with_spinner( + rpc_client: &RpcClient, + mut transactions: Vec, + signer_keys: &T, +) -> Result<(), Box> { + let progress_bar = new_spinner_progress_bar(); + let mut send_retries = 5; + loop { + let mut status_retries = 15; + + // Send all transactions + let mut transactions_signatures = vec![]; + let num_transactions = transactions.len(); + for transaction in transactions { + if cfg!(not(test)) { + // Delay ~1 tick between write transactions in an attempt to reduce AccountInUse errors + // when all the write transactions modify the same program account (eg, deploying a + // new program) + sleep(Duration::from_millis(1000 / DEFAULT_TICKS_PER_SECOND)); + } + + let signature = rpc_client + .send_transaction_with_config( + &transaction, + RpcSendTransactionConfig { + skip_preflight: true, + }, + ) + .ok(); + transactions_signatures.push((transaction, signature)); + + progress_bar.set_message(&format!( + "[{}/{}] Transactions sent", + transactions_signatures.len(), + num_transactions + )); + } + + // Collect statuses for all the transactions, drop those that are confirmed + while status_retries > 0 { + status_retries -= 1; + + progress_bar.set_message(&format!( + "[{}/{}] Transactions confirmed", + num_transactions - transactions_signatures.len(), + num_transactions + )); + + if cfg!(not(test)) { + // Retry twice a second + sleep(Duration::from_millis(500)); + } + + transactions_signatures = transactions_signatures + .into_iter() + .filter(|(_transaction, signature)| { + if let Some(signature) = signature { + if let Ok(status) = rpc_client.get_signature_status(&signature) { + if rpc_client + .get_num_blocks_since_signature_confirmation(&signature) + .unwrap_or(0) + > 1 + { + return false; + } else { + return match status { + None => true, + Some(result) => result.is_err(), + }; + } + } + } + true + }) + .collect(); + + if transactions_signatures.is_empty() { + return Ok(()); + } + } + + if send_retries == 0 { + return Err("Transactions failed".into()); + } + send_retries -= 1; + + // Re-sign any failed transactions with a new blockhash and retry + let (blockhash, _fee_calculator) = rpc_client + .get_new_blockhash(&transactions_signatures[0].0.message().recent_blockhash)?; + transactions = vec![]; + for (mut transaction, _) in transactions_signatures.into_iter() { + transaction.try_sign(signer_keys, blockhash)?; + transactions.push(transaction); + } + } +} + fn process_deploy( rpc_client: &RpcClient, config: &CliConfig, @@ -1366,11 +1464,18 @@ fn process_deploy( })?; trace!("Writing program data"); - rpc_client.send_and_confirm_transactions(write_transactions, &signers)?; + send_and_confirm_transactions_with_spinner(&rpc_client, write_transactions, &signers).map_err( + |_| CliError::DynamicProgramError("Data writes to program account failed".to_string()), + )?; trace!("Finalizing program account"); rpc_client - .send_and_confirm_transaction_with_spinner(&finalize_tx) + .send_and_confirm_transaction_with_spinner_and_config( + &finalize_tx, + RpcSendTransactionConfig { + skip_preflight: true, + }, + ) .map_err(|e| { CliError::DynamicProgramError(format!("Program finalize transaction failed: {}", e)) })?; diff --git a/cli/src/cluster_query.rs b/cli/src/cluster_query.rs index 82556e6ab370e0..9d4ff655fc6605 100644 --- a/cli/src/cluster_query.rs +++ b/cli/src/cluster_query.rs @@ -1,11 +1,10 @@ use crate::{ cli::{check_account_for_fee, CliCommand, CliCommandInfo, CliConfig, CliError, ProcessResult}, cli_output::*, - display::println_name_value, + display::{new_spinner_progress_bar, println_name_value}, }; use clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand}; use console::{style, Emoji}; -use indicatif::{ProgressBar, ProgressStyle}; use solana_clap_utils::{ commitment::{commitment_arg, COMMITMENT_ARG}, input_parsers::*, @@ -469,15 +468,6 @@ pub fn parse_transaction_history( }) } -/// Creates a new process bar for processing that will take an unknown amount of time -fn new_spinner_progress_bar() -> ProgressBar { - let progress_bar = ProgressBar::new(42); - progress_bar - .set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}")); - progress_bar.enable_steady_tick(100); - progress_bar -} - pub fn process_catchup( rpc_client: &RpcClient, node_pubkey: &Pubkey, diff --git a/cli/src/display.rs b/cli/src/display.rs index 42328bd5a80fe3..b089564b17cff2 100644 --- a/cli/src/display.rs +++ b/cli/src/display.rs @@ -1,5 +1,6 @@ use crate::cli::SettingType; use console::style; +use indicatif::{ProgressBar, ProgressStyle}; use solana_sdk::{ hash::Hash, native_token::lamports_to_sol, program_utils::limited_deserialize, transaction::Transaction, @@ -200,3 +201,12 @@ pub fn println_transaction( } } } + +/// Creates a new process bar for processing that will take an unknown amount of time +pub fn new_spinner_progress_bar() -> ProgressBar { + let progress_bar = ProgressBar::new(42); + progress_bar + .set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}")); + progress_bar.enable_steady_tick(100); + progress_bar +} diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 2de16c3e3d08e0..d111b27a1cb263 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -3,7 +3,7 @@ use crate::{ generic_rpc_client_request::GenericRpcClientRequest, mock_rpc_client_request::{MockRpcClientRequest, Mocks}, rpc_client_request::RpcClientRequest, - rpc_config::RpcLargestAccountsConfig, + rpc_config::{RpcLargestAccountsConfig, RpcSendTransactionConfig}, rpc_request::{RpcError, RpcRequest}, rpc_response::*, }; @@ -31,7 +31,6 @@ use solana_transaction_status::{ }; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::{ - error, net::SocketAddr, thread::sleep, time::{Duration, Instant}, @@ -95,10 +94,20 @@ impl RpcClient { } pub fn send_transaction(&self, transaction: &Transaction) -> ClientResult { + self.send_transaction_with_config(transaction, RpcSendTransactionConfig::default()) + } + + pub fn send_transaction_with_config( + &self, + transaction: &Transaction, + config: RpcSendTransactionConfig, + ) -> ClientResult { let serialized_encoded = bs58::encode(serialize(transaction).unwrap()).into_string(); - let signature_base58_str: String = - self.send(RpcRequest::SendTransaction, json!([serialized_encoded]))?; + let signature_base58_str: String = self.send( + RpcRequest::SendTransaction, + json!([serialized_encoded, config]), + )?; let signature = signature_base58_str .parse::() @@ -407,74 +416,6 @@ impl RpcClient { } } - pub fn send_and_confirm_transactions( - &self, - mut transactions: Vec, - signer_keys: &T, - ) -> Result<(), Box> { - let mut send_retries = 5; - loop { - let mut status_retries = 15; - - // Send all transactions - let mut transactions_signatures = vec![]; - for transaction in transactions { - if cfg!(not(test)) { - // Delay ~1 tick between write transactions in an attempt to reduce AccountInUse errors - // when all the write transactions modify the same program account (eg, deploying a - // new program) - sleep(Duration::from_millis(1000 / DEFAULT_TICKS_PER_SECOND)); - } - - let signature = self.send_transaction(&transaction).ok(); - transactions_signatures.push((transaction, signature)) - } - - // Collect statuses for all the transactions, drop those that are confirmed - while status_retries > 0 { - status_retries -= 1; - - if cfg!(not(test)) { - // Retry twice a second - sleep(Duration::from_millis(500)); - } - - transactions_signatures = transactions_signatures - .into_iter() - .filter(|(_transaction, signature)| { - if let Some(signature) = signature { - if let Ok(status) = self.get_signature_status(&signature) { - if status.is_none() { - return false; - } - return status.unwrap().is_err(); - } - } - true - }) - .collect(); - - if transactions_signatures.is_empty() { - return Ok(()); - } - } - - if send_retries == 0 { - return Err(RpcError::ForUser("Transactions failed".to_string()).into()); - } - send_retries -= 1; - - // Re-sign any failed transactions with a new blockhash and retry - let (blockhash, _fee_calculator) = - self.get_new_blockhash(&transactions_signatures[0].0.message().recent_blockhash)?; - transactions = vec![]; - for (mut transaction, _) in transactions_signatures.into_iter() { - transaction.try_sign(signer_keys, blockhash)?; - transactions.push(transaction); - } - } - } - pub fn resign_transaction( &self, tx: &mut Transaction, @@ -486,11 +427,7 @@ impl RpcClient { Ok(()) } - pub fn retry_get_balance( - &self, - pubkey: &Pubkey, - _retries: usize, - ) -> Result, Box> { + pub fn retry_get_balance(&self, pubkey: &Pubkey, _retries: usize) -> ClientResult> { let request = RpcRequest::GetBalance; let balance_json = self .client @@ -949,6 +886,17 @@ impl RpcClient { pub fn send_and_confirm_transaction_with_spinner( &self, transaction: &Transaction, + ) -> ClientResult { + self.send_and_confirm_transaction_with_spinner_and_config( + transaction, + RpcSendTransactionConfig::default(), + ) + } + + pub fn send_and_confirm_transaction_with_spinner_and_config( + &self, + transaction: &Transaction, + config: RpcSendTransactionConfig, ) -> ClientResult { let mut confirmations = 0; @@ -964,7 +912,7 @@ impl RpcClient { )); let mut status_retries = 15; let (signature, status) = loop { - let signature = self.send_transaction(transaction)?; + let signature = self.send_transaction_with_config(transaction, config.clone())?; // Get recent commitment in order to count confirmations for successful transactions let status = self diff --git a/client/src/rpc_config.rs b/client/src/rpc_config.rs index bb698776694f2c..48305bc3fd818a 100644 --- a/client/src/rpc_config.rs +++ b/client/src/rpc_config.rs @@ -9,7 +9,13 @@ pub struct RpcSignatureStatusConfig { pub commitment: Option, } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcSendTransactionConfig { + pub skip_preflight: bool, +} + +#[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RpcSimulateTransactionConfig { pub sig_verify: bool, diff --git a/core/src/lib.rs b/core/src/lib.rs index 2d126634387aaf..a3fbc0bf1f1148 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -42,6 +42,7 @@ pub mod retransmit_stage; pub mod rewards_recorder_service; pub mod rpc; pub mod rpc_error; +pub mod rpc_health; pub mod rpc_pubsub; pub mod rpc_pubsub_service; pub mod rpc_service; diff --git a/core/src/rpc.rs b/core/src/rpc.rs index e6189fb319c947..1e28e7a157a99f 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -6,6 +6,7 @@ use crate::{ contact_info::ContactInfo, non_circulating_supply::calculate_non_circulating_supply, rpc_error::RpcCustomError, + rpc_health::*, storage_stage::StorageState, validator::ValidatorExit, }; @@ -75,6 +76,7 @@ pub struct JsonRpcRequestProcessor { config: JsonRpcConfig, storage_state: StorageState, validator_exit: Arc>>, + health: Arc, } impl JsonRpcRequestProcessor { @@ -130,6 +132,7 @@ impl JsonRpcRequestProcessor { blockstore: Arc, storage_state: StorageState, validator_exit: Arc>>, + health: Arc, ) -> Self { JsonRpcRequestProcessor { config, @@ -138,6 +141,7 @@ impl JsonRpcRequestProcessor { blockstore, storage_state, validator_exit, + health, } } @@ -747,6 +751,19 @@ fn verify_signature(input: &str) -> Result { .map_err(|e| Error::invalid_params(format!("{:?}", e))) } +/// Run transactions against a frozen bank without committing the results +fn run_transaction_simulation( + bank: &Bank, + transactions: &[Transaction], +) -> transaction::Result<()> { + assert!(bank.is_frozen(), "simulation bank must be frozen"); + + let batch = bank.prepare_batch(transactions, None); + let (_loaded_accounts, executed, _retryable_transactions, _transaction_count, _signature_count) = + bank.load_and_execute_transactions(&batch, solana_sdk::clock::MAX_PROCESSING_AGE); + executed[0].0.clone().map(|_| ()) +} + #[derive(Clone)] pub struct Meta { pub request_processor: Arc>, @@ -938,7 +955,12 @@ pub trait RpcSol { ) -> Result; #[rpc(meta, name = "sendTransaction")] - fn send_transaction(&self, meta: Self::Metadata, data: String) -> Result; + fn send_transaction( + &self, + meta: Self::Metadata, + data: String, + config: Option, + ) -> Result; #[rpc(meta, name = "simulateTransaction")] fn simulate_transaction( @@ -1465,8 +1487,45 @@ impl RpcSol for RpcSolImpl { } } - fn send_transaction(&self, meta: Self::Metadata, data: String) -> Result { + fn send_transaction( + &self, + meta: Self::Metadata, + data: String, + config: Option, + ) -> Result { + let config = config.unwrap_or_default(); let (wire_transaction, transaction) = deserialize_bs58_transaction(data)?; + let signature = transaction.signatures[0].to_string(); + + if !config.skip_preflight { + if transaction.verify().is_err() { + return Err(RpcCustomError::SendTransactionPreflightFailure { + message: "Transaction signature verification failed".into(), + } + .into()); + } + + if meta.request_processor.read().unwrap().health.check() != RpcHealthStatus::Ok { + return Err(RpcCustomError::SendTransactionPreflightFailure { + message: "RPC node is unhealthy, unable to simulate transaction".into(), + } + .into()); + } + + let bank = &*meta.request_processor.read().unwrap().bank(None)?; + if let Err(err) = run_transaction_simulation(&bank, &[transaction]) { + // Note: it's possible that the transaction simulation failed but the actual + // transaction would succeed, such as when a transaction depends on an earlier + // transaction that has yet to reach max confirmations. In these cases the user + // should use the config.skip_preflight flag, and potentially in the future + // additional controls over what bank is used for preflight should be exposed. + return Err(RpcCustomError::SendTransactionPreflightFailure { + message: format!("Transaction simulation failed: {}", err), + } + .into()); + } + } + let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let tpu_addr = get_tpu_addr(&meta.cluster_info)?; trace!("send_transaction: leader is {:?}", &tpu_addr); @@ -1476,7 +1535,6 @@ impl RpcSol for RpcSolImpl { info!("send_transaction: send_to error: {:?}", err); Error::internal_error() })?; - let signature = transaction.signatures[0].to_string(); trace!( "send_transaction: sent {} bytes, signature={}", wire_transaction.len(), @@ -1492,10 +1550,7 @@ impl RpcSol for RpcSolImpl { config: Option, ) -> RpcResponse { let (_, transaction) = deserialize_bs58_transaction(data)?; - let config = config.unwrap_or(RpcSimulateTransactionConfig { sig_verify: false }); - - let bank = &*meta.request_processor.read().unwrap().bank(None)?; - assert!(bank.is_frozen()); + let config = config.unwrap_or_default(); let mut result = if config.sig_verify { transaction.verify() @@ -1503,17 +1558,10 @@ impl RpcSol for RpcSolImpl { Ok(()) }; + let bank = &*meta.request_processor.read().unwrap().bank(None)?; + if result.is_ok() { - let transactions = [transaction]; - let batch = bank.prepare_batch(&transactions, None); - let ( - _loaded_accounts, - executed, - _retryable_transactions, - _transaction_count, - _signature_count, - ) = bank.load_and_execute_transactions(&batch, solana_sdk::clock::MAX_PROCESSING_AGE); - result = executed[0].0.clone(); + result = run_transaction_simulation(&bank, &[transaction]); } new_response( @@ -1897,6 +1945,7 @@ pub mod tests { blockstore, StorageState::default(), validator_exit, + RpcHealth::stub(), ))); let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); @@ -1946,6 +1995,7 @@ pub mod tests { blockstore, StorageState::default(), validator_exit, + RpcHealth::stub(), ); thread::spawn(move || { let blockhash = bank.confirmed_last_blockhash().0; @@ -2894,6 +2944,7 @@ pub mod tests { blockstore, StorageState::default(), validator_exit, + RpcHealth::stub(), ); Arc::new(RwLock::new(request_processor)) }, @@ -2908,6 +2959,104 @@ pub mod tests { assert_eq!(error["code"], ErrorCode::InvalidParams.code()); } + #[test] + fn test_rpc_send_transaction_preflight() { + let exit = Arc::new(AtomicBool::new(false)); + let validator_exit = create_validator_exit(&exit); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); + let block_commitment_cache = Arc::new(RwLock::new( + BlockCommitmentCache::default_with_blockstore(blockstore.clone()), + )); + let bank_forks = new_bank_forks().0; + let health = RpcHealth::stub(); + + // Freeze bank 0 to prevent a panic in `run_transaction_simulation()` + bank_forks.write().unwrap().get(0).unwrap().freeze(); + + let mut io = MetaIoHandler::default(); + let rpc = RpcSolImpl; + io.extend_with(rpc.to_delegate()); + let meta = Meta { + request_processor: { + let request_processor = JsonRpcRequestProcessor::new( + JsonRpcConfig::default(), + bank_forks, + block_commitment_cache, + blockstore, + StorageState::default(), + validator_exit, + health.clone(), + ); + Arc::new(RwLock::new(request_processor)) + }, + cluster_info: Arc::new(ClusterInfo::new_with_invalid_keypair( + ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")), + )), + genesis_hash: Hash::default(), + }; + + let mut bad_transaction = + system_transaction::transfer(&Keypair::new(), &Pubkey::default(), 42, Hash::default()); + + // sendTransaction will fail because the blockhash is invalid + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["{}"]}}"#, + bs58::encode(serialize(&bad_transaction).unwrap()).into_string() + ); + let res = io.handle_request_sync(&req, meta.clone()); + assert_eq!( + res, + Some( + r#"{"jsonrpc":"2.0","error":{"code":-32002,"message":"Transaction simulation failed: TransactionError::BlockhashNotFound"},"id":1}"#.to_string(), + ) + ); + + // sendTransaction will fail due to poor node health + health.stub_set_health_status(Some(RpcHealthStatus::Behind)); + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["{}"]}}"#, + bs58::encode(serialize(&bad_transaction).unwrap()).into_string() + ); + let res = io.handle_request_sync(&req, meta.clone()); + assert_eq!( + res, + Some( + r#"{"jsonrpc":"2.0","error":{"code":-32002,"message":"RPC node is unhealthy, unable to simulate transaction"},"id":1}"#.to_string(), + ) + ); + health.stub_set_health_status(None); + + // sendTransaction will fail due to invalid signature + bad_transaction.signatures[0] = Signature::default(); + + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["{}"]}}"#, + bs58::encode(serialize(&bad_transaction).unwrap()).into_string() + ); + let res = io.handle_request_sync(&req, meta.clone()); + assert_eq!( + res, + Some( + r#"{"jsonrpc":"2.0","error":{"code":-32002,"message":"Transaction signature verification failed"},"id":1}"#.to_string(), + ) + ); + + // sendTransaction will now succeed because skipPreflight=true even though it's a bad + // transaction + let req = format!( + r#"{{"jsonrpc":"2.0","id":1,"method":"sendTransaction","params":["{}", {{"skipPreflight": true}}]}}"#, + bs58::encode(serialize(&bad_transaction).unwrap()).into_string() + ); + let res = io.handle_request_sync(&req, meta); + assert_eq!( + res, + Some( + r#"{"jsonrpc":"2.0","result":"1111111111111111111111111111111111111111111111111111111111111111","id":1}"#.to_string(), + ) + ); + } + #[test] fn test_rpc_get_tpu_addr() { let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair( @@ -2964,7 +3113,9 @@ pub mod tests { ) } - pub fn create_validator_exit(exit: &Arc) -> Arc>> { + pub(crate) fn create_validator_exit( + exit: &Arc, + ) -> Arc>> { let mut validator_exit = ValidatorExit::default(); let exit_ = exit.clone(); validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed))); @@ -2987,6 +3138,7 @@ pub mod tests { blockstore, StorageState::default(), validator_exit, + RpcHealth::stub(), ); assert_eq!(request_processor.validator_exit(), Ok(false)); assert_eq!(exit.load(Ordering::Relaxed), false); @@ -3010,6 +3162,7 @@ pub mod tests { blockstore, StorageState::default(), validator_exit, + RpcHealth::stub(), ); assert_eq!(request_processor.validator_exit(), Ok(true)); assert_eq!(exit.load(Ordering::Relaxed), true); @@ -3093,6 +3246,7 @@ pub mod tests { blockstore, StorageState::default(), validator_exit, + RpcHealth::stub(), ); assert_eq!( request_processor.get_block_commitment(0), diff --git a/core/src/rpc_error.rs b/core/src/rpc_error.rs index ab727776ef9e12..276ad96c5f63d5 100644 --- a/core/src/rpc_error.rs +++ b/core/src/rpc_error.rs @@ -3,6 +3,7 @@ use solana_sdk::clock::Slot; const JSON_RPC_SERVER_ERROR_0: i64 = -32000; const JSON_RPC_SERVER_ERROR_1: i64 = -32001; +const JSON_RPC_SERVER_ERROR_2: i64 = -32002; pub enum RpcCustomError { NonexistentClusterRoot { @@ -13,6 +14,9 @@ pub enum RpcCustomError { slot: Slot, first_available_block: Slot, }, + SendTransactionPreflightFailure { + message: String, + }, } impl From for Error { @@ -40,6 +44,11 @@ impl From for Error { ), data: None, }, + RpcCustomError::SendTransactionPreflightFailure { message } => Self { + code: ErrorCode::ServerError(JSON_RPC_SERVER_ERROR_2), + message, + data: None, + }, } } } diff --git a/core/src/rpc_health.rs b/core/src/rpc_health.rs new file mode 100644 index 00000000000000..85d6408e272d0a --- /dev/null +++ b/core/src/rpc_health.rs @@ -0,0 +1,120 @@ +use crate::cluster_info::ClusterInfo; +use solana_sdk::pubkey::Pubkey; +use std::{ + collections::HashSet, + sync::atomic::{AtomicBool, Ordering}, + sync::Arc, +}; + +#[derive(PartialEq, Clone, Copy)] +pub enum RpcHealthStatus { + Ok, + Behind, // Validator is behind its trusted validators +} + +pub struct RpcHealth { + cluster_info: Arc, + trusted_validators: Option>, + health_check_slot_distance: u64, + override_health_check: Arc, + #[cfg(test)] + stub_health_status: std::sync::RwLock>, +} + +impl RpcHealth { + pub fn new( + cluster_info: Arc, + trusted_validators: Option>, + health_check_slot_distance: u64, + override_health_check: Arc, + ) -> Self { + Self { + cluster_info, + trusted_validators, + health_check_slot_distance, + override_health_check, + #[cfg(test)] + stub_health_status: std::sync::RwLock::new(None), + } + } + + pub fn check(&self) -> RpcHealthStatus { + #[cfg(test)] + { + if let Some(stub_health_status) = *self.stub_health_status.read().unwrap() { + return stub_health_status; + } + } + + if self.override_health_check.load(Ordering::Relaxed) { + RpcHealthStatus::Ok + } else if let Some(trusted_validators) = &self.trusted_validators { + let (latest_account_hash_slot, latest_trusted_validator_account_hash_slot) = { + ( + self.cluster_info + .get_accounts_hash_for_node(&self.cluster_info.id(), |hashes| { + hashes + .iter() + .max_by(|a, b| a.0.cmp(&b.0)) + .map(|slot_hash| slot_hash.0) + }) + .flatten() + .unwrap_or(0), + trusted_validators + .iter() + .map(|trusted_validator| { + self.cluster_info + .get_accounts_hash_for_node(&trusted_validator, |hashes| { + hashes + .iter() + .max_by(|a, b| a.0.cmp(&b.0)) + .map(|slot_hash| slot_hash.0) + }) + .flatten() + .unwrap_or(0) + }) + .max() + .unwrap_or(0), + ) + }; + + // This validator is considered healthy if its latest account hash slot is within + // `health_check_slot_distance` of the latest trusted validator's account hash slot + if latest_account_hash_slot > 0 + && latest_trusted_validator_account_hash_slot > 0 + && latest_account_hash_slot + > latest_trusted_validator_account_hash_slot + .saturating_sub(self.health_check_slot_distance) + { + RpcHealthStatus::Ok + } else { + warn!( + "health check: me={}, latest trusted_validator={}", + latest_account_hash_slot, latest_trusted_validator_account_hash_slot + ); + RpcHealthStatus::Behind + } + } else { + // No trusted validator point of reference available, so this validator is healthy + // because it's running + RpcHealthStatus::Ok + } + } + + #[cfg(test)] + pub(crate) fn stub() -> Arc { + Arc::new(Self::new( + Arc::new(ClusterInfo::new_with_invalid_keypair( + crate::contact_info::ContactInfo::default(), + )), + None, + 42, + Arc::new(AtomicBool::new(false)), + )) + } + + #[cfg(test)] + pub(crate) fn stub_set_health_status(&self, stub_health_status: Option) { + *self.stub_health_status.write().unwrap() = stub_health_status; + } +} diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 2d5476eaf9507f..699bc3fdee5219 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -1,7 +1,7 @@ //! The `rpc_service` module implements the Solana JSON RPC service. use crate::{ - cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*, + cluster_info::ClusterInfo, commitment::BlockCommitmentCache, rpc::*, rpc_health::*, storage_stage::StorageState, validator::ValidatorExit, }; use jsonrpc_core::MetaIoHandler; @@ -20,7 +20,7 @@ use std::{ collections::HashSet, net::SocketAddr, path::{Path, PathBuf}, - sync::atomic::{AtomicBool, Ordering}, + sync::atomic::AtomicBool, sync::{mpsc::channel, Arc, RwLock}, thread::{self, Builder, JoinHandle}, }; @@ -39,33 +39,24 @@ struct RpcRequestMiddleware { ledger_path: PathBuf, snapshot_archive_path_regex: Regex, snapshot_config: Option, - cluster_info: Arc, - trusted_validators: Option>, bank_forks: Arc>, - health_check_slot_distance: u64, - override_health_check: Arc, + health: Arc, } impl RpcRequestMiddleware { pub fn new( ledger_path: PathBuf, snapshot_config: Option, - cluster_info: Arc, - trusted_validators: Option>, bank_forks: Arc>, - health_check_slot_distance: u64, - override_health_check: Arc, + health: Arc, ) -> Self { Self { ledger_path, snapshot_archive_path_regex: Regex::new(r"/snapshot-\d+-[[:alnum:]]+\.tar\.bz2$") .unwrap(), snapshot_config, - cluster_info, - trusted_validators, bank_forks, - health_check_slot_distance, - override_health_check, + health, } } @@ -136,60 +127,10 @@ impl RpcRequestMiddleware { } fn health_check(&self) -> &'static str { - let response = if self.override_health_check.load(Ordering::Relaxed) { - "ok" - } else if let Some(trusted_validators) = &self.trusted_validators { - let (latest_account_hash_slot, latest_trusted_validator_account_hash_slot) = { - ( - self.cluster_info - .get_accounts_hash_for_node(&self.cluster_info.id(), |hashes| { - hashes - .iter() - .max_by(|a, b| a.0.cmp(&b.0)) - .map(|slot_hash| slot_hash.0) - }) - .flatten() - .unwrap_or(0), - trusted_validators - .iter() - .map(|trusted_validator| { - self.cluster_info - .get_accounts_hash_for_node(&trusted_validator, |hashes| { - hashes - .iter() - .max_by(|a, b| a.0.cmp(&b.0)) - .map(|slot_hash| slot_hash.0) - }) - .flatten() - .unwrap_or(0) - }) - .max() - .unwrap_or(0), - ) - }; - - // This validator is considered healthy if its latest account hash slot is within - // `health_check_slot_distance` of the latest trusted validator's account hash slot - if latest_account_hash_slot > 0 - && latest_trusted_validator_account_hash_slot > 0 - && latest_account_hash_slot - > latest_trusted_validator_account_hash_slot - .saturating_sub(self.health_check_slot_distance) - { - "ok" - } else { - warn!( - "health check: me={}, latest trusted_validator={}", - latest_account_hash_slot, latest_trusted_validator_account_hash_slot - ); - "behind" - } - } else { - // No trusted validator point of reference available, so this validator is healthy - // because it's running - "ok" + let response = match self.health.check() { + RpcHealthStatus::Ok => "ok", + RpcHealthStatus::Behind => "behind", }; - info!("health check: {}", response); response } @@ -299,7 +240,14 @@ impl JsonRpcService { ) -> Self { info!("rpc bound to {:?}", rpc_addr); info!("rpc configuration: {:?}", config); - let health_check_slot_distance = config.health_check_slot_distance; + + let health = Arc::new(RpcHealth::new( + cluster_info.clone(), + trusted_validators, + config.health_check_slot_distance, + override_health_check, + )); + let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new( config, bank_forks.clone(), @@ -307,6 +255,7 @@ impl JsonRpcService { blockstore, storage_state, validator_exit.clone(), + health.clone(), ))); #[cfg(test)] @@ -325,11 +274,8 @@ impl JsonRpcService { let request_middleware = RpcRequestMiddleware::new( ledger_path, snapshot_config, - cluster_info.clone(), - trusted_validators, bank_forks.clone(), - health_check_slot_distance, - override_health_check, + health.clone(), ); let server = ServerBuilder::with_meta_extractor( io, @@ -403,7 +349,10 @@ mod tests { }; use solana_runtime::bank::Bank; use solana_sdk::signature::Signer; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::atomic::Ordering, + }; #[test] fn test_rpc_new() { @@ -482,17 +431,12 @@ mod tests { #[test] fn test_is_file_get_path() { - let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); let bank_forks = create_bank_forks(); - let rrm = RpcRequestMiddleware::new( PathBuf::from("/"), None, - cluster_info.clone(), - None, bank_forks.clone(), - 42, - Arc::new(AtomicBool::new(false)), + RpcHealth::stub(), ); let rrm_with_snapshot_config = RpcRequestMiddleware::new( PathBuf::from("/"), @@ -501,11 +445,8 @@ mod tests { snapshot_package_output_path: PathBuf::from("/"), snapshot_path: PathBuf::from("/"), }), - cluster_info, - None, bank_forks, - 42, - Arc::new(AtomicBool::new(false)), + RpcHealth::stub(), ); assert!(rrm.is_file_get_path("/genesis.tar.bz2")); @@ -531,16 +472,11 @@ mod tests { #[test] fn test_health_check_with_no_trusted_validators() { - let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); - let rm = RpcRequestMiddleware::new( PathBuf::from("/"), None, - cluster_info.clone(), - None, create_bank_forks(), - 42, - Arc::new(AtomicBool::new(false)), + RpcHealth::stub(), ); assert_eq!(rm.health_check(), "ok"); } @@ -548,20 +484,18 @@ mod tests { #[test] fn test_health_check_with_trusted_validators() { let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); - let health_check_slot_distance = 123; - let override_health_check = Arc::new(AtomicBool::new(false)); let trusted_validators = vec![Pubkey::new_rand(), Pubkey::new_rand(), Pubkey::new_rand()]; - let rm = RpcRequestMiddleware::new( - PathBuf::from("/"), - None, + + let health = Arc::new(RpcHealth::new( cluster_info.clone(), Some(trusted_validators.clone().into_iter().collect()), - create_bank_forks(), health_check_slot_distance, override_health_check.clone(), - ); + )); + + let rm = RpcRequestMiddleware::new(PathBuf::from("/"), None, create_bank_forks(), health); // No account hashes for this node or any trusted validators == "behind" assert_eq!(rm.health_check(), "behind"); diff --git a/docs/src/apps/jsonrpc-api.md b/docs/src/apps/jsonrpc-api.md index ebe6dcefaec020..765de97de5a6f6 100644 --- a/docs/src/apps/jsonrpc-api.md +++ b/docs/src/apps/jsonrpc-api.md @@ -1156,11 +1156,20 @@ curl -X POST -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","id":1, "m ### sendTransaction -Creates new transaction +Submits a signed transaction to the cluster for processing. + +Before submitting, the following preflight checks are performed: +1. The transaction signatures are verified +2. The transaction is simulated against the latest max confirmed bank +and on failure an error will be returned. Preflight checks may be disabled if +desired. #### Parameters: * `` - fully-signed Transaction, as base-58 encoded string +* `` - (optional) Configuration object containing the following field: + * `skipPreflight: ` - if true, skip the preflight transaction checks (default: false) + #### Results: diff --git a/stake-monitor/src/lib.rs b/stake-monitor/src/lib.rs index 4eea6067f6df79..57c9dd61c37ad6 100644 --- a/stake-monitor/src/lib.rs +++ b/stake-monitor/src/lib.rs @@ -329,6 +329,7 @@ pub fn process_slots(rpc_client: &RpcClient, accounts_info: &mut AccountsInfo, b mod test { use super::*; use serial_test_derive::serial; + use solana_client::rpc_config::RpcSendTransactionConfig; use solana_core::{rpc::JsonRpcConfig, validator::ValidatorConfig}; use solana_local_cluster::local_cluster::{ClusterConfig, LocalCluster}; use solana_sdk::{ @@ -443,20 +444,25 @@ mod test { // Withdraw instruction causes non-compliance let stake3_withdraw_signature = rpc_client - .send_transaction(&Transaction::new( - &[&payer, &stake3_keypair], - Message::new_with_payer( - &[stake_instruction::withdraw( - &stake3_keypair.pubkey(), - &stake3_keypair.pubkey(), - &payer.pubkey(), - one_sol, - None, - )], - Some(&payer.pubkey()), + .send_transaction_with_config( + &Transaction::new( + &[&payer, &stake3_keypair], + Message::new_with_payer( + &[stake_instruction::withdraw( + &stake3_keypair.pubkey(), + &stake3_keypair.pubkey(), + &payer.pubkey(), + one_sol, + None, + )], + Some(&payer.pubkey()), + ), + blockhash, ), - blockhash, - )) + RpcSendTransactionConfig { + skip_preflight: true, + }, + ) .unwrap(); rpc_client @@ -492,19 +498,24 @@ mod test { // Split stake4 into stake5 let stake5_keypair = Keypair::new(); let stake45_split_signature = rpc_client - .send_transaction(&Transaction::new( - &[&payer, &stake5_keypair], - Message::new_with_payer( - &stake_instruction::split( - &stake4_keypair.pubkey(), - &payer.pubkey(), - one_sol, - &stake5_keypair.pubkey(), + .send_transaction_with_config( + &Transaction::new( + &[&payer, &stake5_keypair], + Message::new_with_payer( + &stake_instruction::split( + &stake4_keypair.pubkey(), + &payer.pubkey(), + one_sol, + &stake5_keypair.pubkey(), + ), + Some(&payer.pubkey()), ), - Some(&payer.pubkey()), + blockhash, ), - blockhash, - )) + RpcSendTransactionConfig { + skip_preflight: true, + }, + ) .unwrap(); rpc_client @@ -539,12 +550,17 @@ mod test { // Withdraw 1 sol from system 1 to make it non-compliant rpc_client - .send_transaction(&system_transaction::transfer( - &system1_keypair, - &payer.pubkey(), - one_sol, - blockhash, - )) + .send_transaction_with_config( + &system_transaction::transfer( + &system1_keypair, + &payer.pubkey(), + one_sol, + blockhash, + ), + RpcSendTransactionConfig { + skip_preflight: true, + }, + ) .unwrap(); // System transfer 2 @@ -572,12 +588,17 @@ mod test { // Withdraw 1 sol - 1 lamport from system 2, it's still compliant rpc_client - .send_transaction(&system_transaction::transfer( - &system2_keypair, - &payer.pubkey(), - one_sol - 1, - blockhash, - )) + .send_transaction_with_config( + &system_transaction::transfer( + &system2_keypair, + &payer.pubkey(), + one_sol - 1, + blockhash, + ), + RpcSendTransactionConfig { + skip_preflight: true, + }, + ) .unwrap(); // Process all the transactions diff --git a/tokens/src/thin_client.rs b/tokens/src/thin_client.rs new file mode 100644 index 00000000000000..c1eab4b33de171 --- /dev/null +++ b/tokens/src/thin_client.rs @@ -0,0 +1,179 @@ +use solana_client::{rpc_client::RpcClient, rpc_config::RpcSendTransactionConfig}; +use solana_runtime::bank_client::BankClient; +use solana_sdk::{ + account::Account, + client::{AsyncClient, SyncClient}, + fee_calculator::FeeCalculator, + hash::Hash, + message::Message, + pubkey::Pubkey, + signature::{Signature, Signer}, + signers::Signers, + system_instruction, + sysvar::{ + recent_blockhashes::{self, RecentBlockhashes}, + Sysvar, + }, + transaction::Transaction, + transport::{Result, TransportError}, +}; +use solana_transaction_status::TransactionStatus; + +pub trait Client { + fn send_transaction1(&self, transaction: Transaction) -> Result; + fn get_signature_statuses1( + &self, + signatures: &[Signature], + ) -> Result>>; + fn get_balance1(&self, pubkey: &Pubkey) -> Result; + fn get_recent_blockhash1(&self) -> Result<(Hash, FeeCalculator)>; + fn get_account1(&self, pubkey: &Pubkey) -> Result>; +} + +impl Client for RpcClient { + fn send_transaction1(&self, transaction: Transaction) -> Result { + self.send_transaction_with_config( + &transaction, + RpcSendTransactionConfig { + skip_preflight: true, + }, + ) + .map_err(|e| TransportError::Custom(e.to_string())) + } + + fn get_signature_statuses1( + &self, + signatures: &[Signature], + ) -> Result>> { + self.get_signature_statuses(signatures) + .map(|response| response.value) + .map_err(|e| TransportError::Custom(e.to_string())) + } + + fn get_balance1(&self, pubkey: &Pubkey) -> Result { + self.get_balance(pubkey) + .map_err(|e| TransportError::Custom(e.to_string())) + } + + fn get_recent_blockhash1(&self) -> Result<(Hash, FeeCalculator)> { + self.get_recent_blockhash() + .map_err(|e| TransportError::Custom(e.to_string())) + } + + fn get_account1(&self, pubkey: &Pubkey) -> Result> { + self.get_account(pubkey) + .map(Some) + .map_err(|e| TransportError::Custom(e.to_string())) + } +} + +impl Client for BankClient { + fn send_transaction1(&self, transaction: Transaction) -> Result { + self.async_send_transaction(transaction) + } + + fn get_signature_statuses1( + &self, + signatures: &[Signature], + ) -> Result>> { + signatures + .iter() + .map(|signature| { + self.get_signature_status(signature).map(|opt| { + opt.map(|status| TransactionStatus { + slot: 0, + confirmations: None, + status, + err: None, + }) + }) + }) + .collect() + } + + fn get_balance1(&self, pubkey: &Pubkey) -> Result { + self.get_balance(pubkey) + } + + fn get_recent_blockhash1(&self) -> Result<(Hash, FeeCalculator)> { + self.get_recent_blockhash() + } + + fn get_account1(&self, pubkey: &Pubkey) -> Result> { + self.get_account(pubkey) + } +} + +pub struct ThinClient { + client: C, + dry_run: bool, +} + +impl ThinClient { + pub fn new(client: C, dry_run: bool) -> Self { + Self { client, dry_run } + } + + pub fn send_transaction(&self, transaction: Transaction) -> Result { + if self.dry_run { + return Ok(Signature::default()); + } + self.client.send_transaction1(transaction) + } + + pub fn poll_for_confirmation(&self, signature: &Signature) -> Result<()> { + while self.get_signature_statuses(&[*signature])?[0].is_none() { + std::thread::sleep(std::time::Duration::from_millis(500)); + } + Ok(()) + } + + pub fn get_signature_statuses( + &self, + signatures: &[Signature], + ) -> Result>> { + self.client.get_signature_statuses1(signatures) + } + + pub fn send_message(&self, message: Message, signers: &S) -> Result { + if self.dry_run { + return Ok(Transaction::new_unsigned(message)); + } + let (blockhash, _fee_caluclator) = self.get_recent_blockhash()?; + let transaction = Transaction::new(signers, message, blockhash); + self.send_transaction(transaction.clone())?; + Ok(transaction) + } + + pub fn transfer( + &self, + lamports: u64, + sender_keypair: &S, + to_pubkey: &Pubkey, + ) -> Result { + let create_instruction = + system_instruction::transfer(&sender_keypair.pubkey(), &to_pubkey, lamports); + let message = Message::new(&[create_instruction]); + self.send_message(message, &[sender_keypair]) + } + + pub fn get_recent_blockhash(&self) -> Result<(Hash, FeeCalculator)> { + self.client.get_recent_blockhash1() + } + + pub fn get_balance(&self, pubkey: &Pubkey) -> Result { + self.client.get_balance1(pubkey) + } + + pub fn get_account(&self, pubkey: &Pubkey) -> Result> { + self.client.get_account1(pubkey) + } + + pub fn get_recent_blockhashes(&self) -> Result> { + let opt_blockhashes_account = self.get_account(&recent_blockhashes::id())?; + let blockhashes_account = opt_blockhashes_account.unwrap(); + let recent_blockhashes = RecentBlockhashes::from_account(&blockhashes_account).unwrap(); + let hashes = recent_blockhashes.iter().map(|x| x.blockhash).collect(); + Ok(hashes) + } +}