Skip to content

Commit

Permalink
optimize retry pool
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Dec 2, 2024
1 parent dbbe367 commit c7c05af
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 54 deletions.
2 changes: 1 addition & 1 deletion local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
debug_keys: config.debug_keys.clone(),
contact_debug_interval: config.contact_debug_interval,
contact_save_interval: config.contact_save_interval,
send_transaction_service_config: config.send_transaction_service_config.clone(),
send_transaction_service_config: config.send_transaction_service_config,
no_poh_speed_test: config.no_poh_speed_test,
no_os_memory_stats_reporting: config.no_os_memory_stats_reporting,
no_os_network_stats_reporting: config.no_os_network_stats_reporting,
Expand Down
137 changes: 84 additions & 53 deletions send-transaction-service/src/send_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ use {
hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature,
},
std::{
collections::{
hash_map::{Entry, HashMap},
HashSet,
},
collections::hash_map::{Entry, HashMap},
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -93,6 +90,16 @@ impl TransactionInfo {
last_sent_time,
}
}

fn get_max_retries(
&self,
default_max_retries: Option<usize>,
service_max_retries: usize,
) -> Option<usize> {
self.max_retries
.or(default_max_retries)
.map(|max_retries| max_retries.min(service_max_retries))
}
}

#[derive(Default, Debug, PartialEq, Eq)]
Expand All @@ -103,6 +110,7 @@ struct ProcessTransactionsResult {
max_retries_elapsed: u64,
failed: u64,
retained: u64,
last_sent_time: Option<Instant>,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -137,7 +145,7 @@ impl Default for Config {

/// The maximum duration the retry thread may be configured to sleep before
/// processing the transactions that need to be retried.
pub const MAX_RETRY_SLEEP_MS: u64 = 1000;
pub const MAX_RETRY_SLEEP_MS: u64 = 1_000;

impl SendTransactionService {
pub fn new<T: TpuInfo + std::marker::Send + 'static>(
Expand Down Expand Up @@ -192,6 +200,8 @@ impl SendTransactionService {
client.clone(),
retry_transactions.clone(),
stats_report.clone(),
config.service_max_retries,
config.default_max_retries,
config.batch_send_rate_ms,
config.batch_size,
config.retry_pool_max_size,
Expand Down Expand Up @@ -222,6 +232,8 @@ impl SendTransactionService {
client: ConnectionCacheClient<T>,
retry_transactions: Arc<Mutex<HashMap<Signature, TransactionInfo>>>,
stats_report: Arc<SendTransactionServiceStatsReport>,
service_max_retries: usize,
default_max_retries: Option<usize>,
batch_send_rate_ms: u64,
batch_size: usize,
retry_pool_max_size: usize,
Expand Down Expand Up @@ -285,9 +297,17 @@ impl SendTransactionService {
{
// take a lock of retry_transactions and move the batch to the retry set.
let mut retry_transactions = retry_transactions.lock().unwrap();
let transactions_to_retry = transactions.len();
let mut transactions_to_retry: usize = 0;
let mut transactions_added_to_retry: usize = 0;
for (signature, mut transaction_info) in transactions.drain() {
// drop transactions with 0 max retries
let max_retries = transaction_info
.get_max_retries(default_max_retries, service_max_retries);
if max_retries == Some(0) {
continue;
}
transactions_to_retry += 1;

let retry_len = retry_transactions.len();
let entry = retry_transactions.entry(signature);
if let Entry::Vacant(_) = entry {
Expand Down Expand Up @@ -329,19 +349,20 @@ impl SendTransactionService {
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
info!("Starting send-transaction-service::retry_thread with config.");
let retry_interval_ms_default = MAX_RETRY_SLEEP_MS.min(retry_rate_ms);
let mut retry_interval_ms = retry_interval_ms_default;
Builder::new()
.name("solStxRetry".to_string())
.spawn(move || loop {
let retry_interval_ms = retry_rate_ms;
let stats = &stats_report.stats;
sleep(Duration::from_millis(
MAX_RETRY_SLEEP_MS.min(retry_interval_ms),
));
sleep(Duration::from_millis(retry_interval_ms));
if exit.load(Ordering::Relaxed) {
break;
}
let mut transactions = retry_transactions.lock().unwrap();
if !transactions.is_empty() {
if transactions.is_empty() {
retry_interval_ms = retry_interval_ms_default;
} else {
let stats = &stats_report.stats;
stats
.retry_queue_size
.store(transactions.len() as u64, Ordering::Relaxed);
Expand All @@ -350,7 +371,7 @@ impl SendTransactionService {
(bank_forks.root_bank(), bank_forks.working_bank())
};

let _result = Self::process_transactions(
let result = Self::process_transactions(
&working_bank,
&root_bank,
&mut transactions,
Expand All @@ -362,6 +383,17 @@ impl SendTransactionService {
stats,
);
stats_report.report();

// to send transactions as soon as possible we adjust retry interval
retry_interval_ms = retry_interval_ms_default
.checked_sub(
result
.last_sent_time
.and_then(|last| Instant::now().checked_duration_since(last))
.and_then(|interval| interval.as_millis().try_into().ok())
.unwrap_or(0),
)
.unwrap_or(retry_interval_ms_default);
}
})
.unwrap()
Expand All @@ -381,7 +413,8 @@ impl SendTransactionService {
) -> ProcessTransactionsResult {
let mut result = ProcessTransactionsResult::default();

let mut batched_transactions = HashSet::new();
let mut batched_transactions = Vec::new();
let mut exceeded_retries_transactions = Vec::new();
let retry_rate = Duration::from_millis(retry_rate_ms);

transactions.retain(|signature, transaction_info| {
Expand All @@ -400,7 +433,8 @@ impl SendTransactionService {
let now = Instant::now();
let expired = transaction_info
.last_sent_time
.map(|last| now.duration_since(last) >= retry_rate)
.and_then(|last| now.checked_duration_since(last))
.map(|elapsed| elapsed >= retry_rate)
.unwrap_or(false);
let verify_nonce_account =
nonce_account::verify_nonce_account(&nonce_account, &durable_nonce);
Expand Down Expand Up @@ -439,21 +473,36 @@ impl SendTransactionService {
let now = Instant::now();
let need_send = transaction_info
.last_sent_time
.map(|last| now.duration_since(last) >= retry_rate)
.and_then(|last| now.checked_duration_since(last))
.map(|elapsed| elapsed >= retry_rate)
.unwrap_or(true);
if need_send {
if transaction_info.last_sent_time.is_some() {
// Transaction sent before is unknown to the working bank, it might have been
// dropped or landed in another fork. Re-send it
// dropped or landed in another fork. Re-send it.

info!("Retrying transaction: {}", signature);
result.retried += 1;
transaction_info.retries += 1;
stats.retries.fetch_add(1, Ordering::Relaxed);
}

batched_transactions.insert(*signature);
batched_transactions.push(*signature);
transaction_info.last_sent_time = Some(now);

let max_retries = transaction_info
.get_max_retries(default_max_retries, service_max_retries);
if let Some(max_retries) = max_retries {
if transaction_info.retries >= max_retries {
exceeded_retries_transactions.push(*signature);
}
}
} else if let Some(last) = transaction_info.last_sent_time {
result.last_sent_time = Some(
result
.last_sent_time
.map(|result_last| result_last.min(last))
.unwrap_or(last),
);
}
true
}
Expand All @@ -471,19 +520,31 @@ impl SendTransactionService {
}
});

stats.retries.fetch_add(result.retried, Ordering::Relaxed);

if !batched_transactions.is_empty() {
// Processing the transactions in batch
let wire_transactions = transactions
let wire_transactions = batched_transactions
.iter()
.filter(|(signature, _)| batched_transactions.contains(signature))
.map(|(_, transaction_info)| transaction_info.wire_transaction.clone());
.filter_map(|signature| transactions.get(signature))
.map(|transaction_info| transaction_info.wire_transaction.clone());

let iter = wire_transactions.chunks(batch_size);
for chunk in &iter {
let chunk = chunk.collect();
client.send_transactions_in_batch(chunk, stats);
}
}

result.max_retries_elapsed += exceeded_retries_transactions.len() as u64;
stats
.transactions_exceeding_max_retries
.fetch_add(result.max_retries_elapsed, Ordering::Relaxed);
for signature in exceeded_retries_transactions {
info!("Dropping transaction due to max retries: {signature}");
transactions.remove(&signature);
}

result
}

Expand Down Expand Up @@ -811,17 +872,6 @@ mod test {
transactions.clear();

info!("Transactions are only retried until max_retries");
transactions.insert(
Signature::from([1; 64]),
TransactionInfo::new(
Signature::default(),
vec![],
working_bank.block_height(),
None,
Some(0),
Some(Instant::now()),
),
);
transactions.insert(
Signature::from([2; 64]),
TransactionInfo::new(
Expand All @@ -844,30 +894,11 @@ mod test {
config.batch_size,
&stats,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
result,
ProcessTransactionsResult {
retried: 1,
max_retries_elapsed: 1,
..ProcessTransactionsResult::default()
}
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank,
&root_bank,
&mut transactions,
&client,
config.retry_rate_ms,
config.service_max_retries,
config.default_max_retries,
config.batch_size,
&stats,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
retried: 1,
max_retries_elapsed: 1,
..ProcessTransactionsResult::default()
}
Expand Down

0 comments on commit c7c05af

Please sign in to comment.