Skip to content

Commit

Permalink
use the same method as banking stage to move buffered packets in and out
Browse files Browse the repository at this point in the history
  • Loading branch information
buffalu committed May 21, 2022
1 parent 5638351 commit 3da4c55
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 60 deletions.
108 changes: 56 additions & 52 deletions core/src/transaction_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//! - TPU vote transactions
//! - Gossip vote transactions
use min_max_heap::MinMaxHeap;
use std::rc::Rc;
use {
crate::{
banking_stage::BatchedTransactionDetails,
Expand Down Expand Up @@ -366,7 +368,7 @@ impl TransactionScheduler {

/// Attempts to schedule a transaction to be executed.
fn try_schedule(
deserialized_packet: &DeserializedPacket,
deserialized_packet: &Rc<ImmutableDeserializedPacket>,
bank: &Arc<Bank>,
highest_wl_blocked_account_fees: &mut HashMap<Pubkey, u64>,
highest_rl_blocked_account_fees: &mut HashMap<Pubkey, u64>,
Expand All @@ -375,14 +377,14 @@ impl TransactionScheduler {
config: &TransactionSchedulerConfig,
) -> Result<SanitizedTransaction> {
let sanitized_tx = Self::transaction_from_deserialized_packet(
deserialized_packet.immutable_section(),
deserialized_packet,
&bank.feature_set,
bank.vote_only_bank(),
bank.as_ref(),
)
.ok_or_else(|| SchedulerError::InvalidSanitizedTransaction)?;

let priority = deserialized_packet.immutable_section().priority();
let priority = deserialized_packet.priority();

let account_locks = sanitized_tx
.get_account_locks(&bank.feature_set)
Expand Down Expand Up @@ -665,60 +667,68 @@ impl TransactionScheduler {
bank: &Arc<Bank>,
qos_service: &QosService,
config: &TransactionSchedulerConfig,
) -> (Vec<SanitizedTransaction>, Vec<DeserializedPacket>) {
) -> Vec<SanitizedTransaction> {
let mut sanitized_transactions = Vec::with_capacity(num_txs);
let mut rescheduled_packets = Vec::with_capacity(1_000);

// hashmap representing the highest fee of currently write-locked and read-locked blocked accounts
// almost a pseudo AccountLocks but fees instead of hashset/read lock count
let mut highest_wl_blocked_account_fees = HashMap::with_capacity(10_000);
let mut highest_rl_blocked_account_fees = HashMap::with_capacity(10_000);

let mut lookback_count = 0;
let mut retryable_packets = MinMaxHeap::with_capacity(unprocessed_packets.capacity());
std::mem::swap(
&mut unprocessed_packets.packet_priority_queue,
&mut retryable_packets,
);

while let Some(deserialized_packet) = unprocessed_packets.pop_max() {
match Self::try_schedule(
&deserialized_packet,
bank,
&mut highest_wl_blocked_account_fees,
&mut highest_rl_blocked_account_fees,
scheduled_accounts,
qos_service,
config,
) {
Ok(sanitized_tx) => {
sanitized_transactions.push(sanitized_tx);
if sanitized_transactions.len() >= num_txs {
break;
}
let mut retryable_packets: MinMaxHeap<_> = retryable_packets
.drain_desc()
.into_iter()
.enumerate()
.filter_map(|(idx, deserialized_packet)| {
// drop the rest
if sanitized_transactions.len() >= num_txs {
return Some(deserialized_packet);
}
Err(e) => {
trace!("e: {:?}", e);
match e {
SchedulerError::InvalidSanitizedTransaction
| SchedulerError::InvalidTransactionFormat(_)
| SchedulerError::TransactionCheckFailed(_) => {
// non-recoverable error, drop the packet
continue;
}
SchedulerError::AccountInUse | SchedulerError::AccountBlocked(_) => {
// need to reschedule
// error!("e: {}", e);
rescheduled_packets.push(deserialized_packet);
}
SchedulerError::QosExceeded => {
// TODO: probably want to move this packet to a separate queue
match Self::try_schedule(
&deserialized_packet,
bank,
&mut highest_wl_blocked_account_fees,
&mut highest_rl_blocked_account_fees,
scheduled_accounts,
qos_service,
config,
) {
Ok(sanitized_tx) => {
sanitized_transactions.push(sanitized_tx);
return None;
}
Err(e) => {
trace!("e: {:?}", e);
match e {
SchedulerError::InvalidSanitizedTransaction
| SchedulerError::InvalidTransactionFormat(_)
| SchedulerError::TransactionCheckFailed(_) => {
// non-recoverable error, drop the packet
return None;
}
SchedulerError::AccountInUse
| SchedulerError::AccountBlocked(_)
| SchedulerError::QosExceeded => {
return Some(deserialized_packet);
}
}
}
}
}
lookback_count += 1;
if lookback_count >= num_txs * 20 {
break;
}
}
})
.collect();

std::mem::swap(
&mut retryable_packets,
&mut unprocessed_packets.packet_priority_queue,
);

(sanitized_transactions, rescheduled_packets)
sanitized_transactions
}

/// Handles scheduler requests and sends back a response over the channel
Expand All @@ -733,7 +743,7 @@ impl TransactionScheduler {
match scheduler_request.msg {
SchedulerMessage::RequestBatch { num_txs, bank } => {
trace!("SchedulerMessage::RequestBatch num_txs: {}", num_txs);
let (sanitized_transactions, rescheduled_packets) = Self::get_scheduled_batch(
let sanitized_transactions = Self::get_scheduled_batch(
unprocessed_packets,
scheduled_accounts,
num_txs,
Expand All @@ -742,9 +752,8 @@ impl TransactionScheduler {
config,
);
trace!(
"sanitized_transactions num: {}, rescheduled_packets num: {}, unprocessed_packets num: {}",
"sanitized_transactions num: {}, unprocessed_packets num: {}",
sanitized_transactions.len(),
rescheduled_packets.len(),
unprocessed_packets.len()
);

Expand All @@ -753,11 +762,6 @@ impl TransactionScheduler {
sanitized_transactions,
}))
.unwrap();

// push rescheduled back on after sending so execution can start
for tx in rescheduled_packets {
unprocessed_packets.push(tx.clone());
}
}
SchedulerMessage::Ping { id } => {
let _ = response_sender
Expand Down
16 changes: 8 additions & 8 deletions transaction-scheduler-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,35 @@ struct Args {
packet_send_rate: usize,

/// Number of packets per batch
#[clap(long, env, default_value_t = 64)]
#[clap(long, env, default_value_t = 128)]
packets_per_batch: usize,

/// Number of batches per message
#[clap(long, env, default_value_t = 20)]
batches_per_msg: usize,

/// Number of consuming threads (number of threads requesting batches from scheduler)
#[clap(long, env, default_value_t = 20)]
#[clap(long, env, default_value_t = 4)]
num_execution_threads: usize,

/// How long each transaction takes to execution in microseconds
#[clap(long, env, default_value_t = 20)]
#[clap(long, env, default_value_t = 15)]
execution_per_tx_us: u64,

/// Duration of benchmark
#[clap(long, env, default_value_t = 20.0)]
duration: f32,

/// Number of accounts to choose from when signing transactions
#[clap(long, env, default_value_t = 1000)]
#[clap(long, env, default_value_t = 100000)]
num_accounts: usize,

/// Number of read locks per tx
#[clap(long, env, default_value_t = 5)]
#[clap(long, env, default_value_t = 4)]
num_read_locks_per_tx: usize,

/// Number of write locks per tx
#[clap(long, env, default_value_t = 5)]
#[clap(long, env, default_value_t = 2)]
num_read_write_locks_per_tx: usize,

/// If true, enables state auction on accounts
Expand Down Expand Up @@ -261,7 +261,7 @@ fn start_execution_threads(
break;
}

let batch = scheduler_handle.request_batch(64, &bank).unwrap();
let batch = scheduler_handle.request_batch(128, &bank).unwrap();

let num_txs = batch.sanitized_transactions.len();
let sleep_duration = if num_txs > 0 {
Expand Down Expand Up @@ -348,7 +348,7 @@ fn main() {
let mut last_log_time = Instant::now();

for msg in stats_receiver {
// info!("msg: {}", msg);
info!("msg: {}", msg);
count += msg;

if last_log_time.elapsed() > Duration::from_secs(1) {
Expand Down

0 comments on commit 3da4c55

Please sign in to comment.