diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index dd09ccc69698f5..7a48ba0fd27781 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -114,6 +114,12 @@ pub struct Config { pub batch_size: usize, /// How frequently batches are sent pub batch_send_rate_ms: u64, +<<<<<<< HEAD +======= + /// When the retry pool exceeds this max size, new transactions are dropped after their first broadcast attempt + pub retry_pool_max_size: usize, + pub tpu_peers: Option>, +>>>>>>> f41fb84e15 (rpc-sts: add config options for stake-weighted qos (#197)) } impl Default for Config { @@ -125,6 +131,11 @@ impl Default for Config { service_max_retries: DEFAULT_SERVICE_MAX_RETRIES, batch_size: DEFAULT_TRANSACTION_BATCH_SIZE, batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS, +<<<<<<< HEAD +======= + retry_pool_max_size: MAX_TRANSACTION_RETRY_POOL_SIZE, + tpu_peers: None, +>>>>>>> f41fb84e15 (rpc-sts: add config options for stake-weighted qos (#197)) } } } @@ -565,12 +576,18 @@ impl SendTransactionService { stats: &SendTransactionServiceStats, ) { // Processing the transactions in batch - let addresses = Self::get_tpu_addresses_with_slots( + let mut addresses = config + .tpu_peers + .as_ref() + .map(|addrs| addrs.iter().map(|a| (a, 0)).collect::>()) + .unwrap_or_default(); + let leader_addresses = Self::get_tpu_addresses_with_slots( tpu_address, leader_info, config, connection_cache.protocol(), ); + addresses.extend(leader_addresses); let wire_transactions = transactions .iter() @@ -583,8 +600,8 @@ impl SendTransactionService { }) .collect::>(); - for address in &addresses { - Self::send_transactions(address.0, &wire_transactions, connection_cache, stats); + for (address, _) in &addresses { + Self::send_transactions(address, &wire_transactions, connection_cache, stats); } } @@ -701,14 +718,20 @@ impl SendTransactionService { let iter = wire_transactions.chunks(config.batch_size); for chunk in iter { + let mut addresses = config + .tpu_peers + .as_ref() + .map(|addrs| addrs.iter().collect::>()) + .unwrap_or_default(); let mut leader_info_provider = leader_info_provider.lock().unwrap(); let leader_info = leader_info_provider.get_leader_info(); - let addresses = Self::get_tpu_addresses( + let leader_addresses = Self::get_tpu_addresses( tpu_address, leader_info, config, connection_cache.protocol(), ); + addresses.extend(leader_addresses); for address in &addresses { Self::send_transactions(address, chunk, connection_cache, stats); diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 08a7288843d803..1368077b31b98f 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1046,6 +1046,34 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .help("The size of transactions to be sent in batch."), ) .arg( +<<<<<<< HEAD +======= + Arg::with_name("rpc_send_transaction_retry_pool_max_size") + .long("rpc-send-transaction-retry-pool-max-size") + .value_name("NUMBER") + .takes_value(true) + .validator(is_parsable::) + .default_value(&default_args.rpc_send_transaction_retry_pool_max_size) + .help("The maximum size of transactions retry pool."), + ) + .arg( + Arg::with_name("rpc_send_transaction_tpu_peer") + .long("rpc-send-transaction-tpu-peer") + .takes_value(true) + .number_of_values(1) + .multiple(true) + .value_name("HOST:PORT") + .validator(solana_net_utils::is_host_port) + .help("Peer(s) to broadcast transactions to instead of the current leader") + ) + .arg( + Arg::with_name("rpc_send_transaction_also_leader") + .long("rpc-send-transaction-also-leader") + .requires("rpc_send_transaction_tpu_peer") + .help("With `--rpc-send-transaction-tpu-peer HOST:PORT`, also send to the current leader") + ) + .arg( +>>>>>>> f41fb84e15 (rpc-sts: add config options for stake-weighted qos (#197)) Arg::with_name("rpc_scan_and_fix_roots") .long("rpc-scan-and-fix-roots") .takes_value(false) diff --git a/validator/src/main.rs b/validator/src/main.rs index 289277a45494ff..41cb2e09c320f7 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1281,6 +1281,27 @@ pub fn main() { ); exit(1); } + let rpc_send_transaction_tpu_peers = matches + .values_of("rpc_send_transaction_tpu_peer") + .map(|values| { + values + .map(solana_net_utils::parse_host_port) + .collect::, String>>() + }) + .transpose() + .unwrap_or_else(|e| { + eprintln!("failed to parse rpc send-transaction-service tpu peer address: {e}"); + exit(1); + }); + let rpc_send_transaction_also_leader = matches.is_present("rpc_send_transaction_also_leader"); + let leader_forward_count = + if rpc_send_transaction_tpu_peers.is_some() && !rpc_send_transaction_also_leader { + // rpc-sts is configured to send only to specific tpu peers. disable leader forwards + 0 + } else { + value_t_or_exit!(matches, "rpc_send_transaction_leader_forward_count", u64) + }; + let full_api = matches.is_present("full_rpc_api"); let mut validator_config = ValidatorConfig { @@ -1374,11 +1395,7 @@ pub fn main() { contact_debug_interval, send_transaction_service_config: send_transaction_service::Config { retry_rate_ms: rpc_send_retry_rate_ms, - leader_forward_count: value_t_or_exit!( - matches, - "rpc_send_transaction_leader_forward_count", - u64 - ), + leader_forward_count, default_max_retries: value_t!( matches, "rpc_send_transaction_default_max_retries", @@ -1392,6 +1409,15 @@ pub fn main() { ), batch_send_rate_ms: rpc_send_batch_send_rate_ms, batch_size: rpc_send_batch_size, +<<<<<<< HEAD +======= + retry_pool_max_size: value_t_or_exit!( + matches, + "rpc_send_transaction_retry_pool_max_size", + usize + ), + tpu_peers: rpc_send_transaction_tpu_peers, +>>>>>>> f41fb84e15 (rpc-sts: add config options for stake-weighted qos (#197)) }, no_poh_speed_test: matches.is_present("no_poh_speed_test"), no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),