Skip to content

Commit

Permalink
generic scheduler (#3611)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Nov 25, 2024
1 parent ee3f3da commit 563d81f
Show file tree
Hide file tree
Showing 10 changed files with 669 additions and 490 deletions.
11 changes: 10 additions & 1 deletion core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ use {
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
transaction_scheduler::{
receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
transaction_state_container::TransactionStateContainer,
},
};

// Below modules are pub to allow use by banking_stage bench
Expand Down Expand Up @@ -612,10 +616,15 @@ impl BankingStage {
// Spawn the central scheduler thread
bank_thread_hdls.push({
let packet_deserializer = PacketDeserializer::new(non_vote_receiver);
let receive_and_buffer = SanitizedTransactionReceiveAndBuffer::new(
packet_deserializer,
bank_forks.clone(),
forwarder.is_some(),
);
let scheduler = PrioGraphScheduler::new(work_senders, finished_work_receiver);
let scheduler_controller = SchedulerController::new(
decision_maker.clone(),
packet_deserializer,
receive_and_buffer,
bank_forks,
scheduler,
worker_metrics,
Expand Down
44 changes: 27 additions & 17 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {
solana_measure::measure_us,
solana_poh::leader_bank_notifier::LeaderBankNotifier,
solana_runtime::bank::Bank,
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
solana_sdk::clock::Slot,
solana_svm::transaction_error_metrics::TransactionErrorMetrics,
std::{
Expand All @@ -21,28 +22,28 @@ use {
};

#[derive(Debug, Error)]
pub enum ConsumeWorkerError {
pub enum ConsumeWorkerError<Tx> {
#[error("Failed to receive work from scheduler: {0}")]
Recv(#[from] RecvError),
#[error("Failed to send finalized consume work to scheduler: {0}")]
Send(#[from] SendError<FinishedConsumeWork>),
Send(#[from] SendError<FinishedConsumeWork<Tx>>),
}

pub(crate) struct ConsumeWorker {
consume_receiver: Receiver<ConsumeWork>,
pub(crate) struct ConsumeWorker<Tx> {
consume_receiver: Receiver<ConsumeWork<Tx>>,
consumer: Consumer,
consumed_sender: Sender<FinishedConsumeWork>,
consumed_sender: Sender<FinishedConsumeWork<Tx>>,

leader_bank_notifier: Arc<LeaderBankNotifier>,
metrics: Arc<ConsumeWorkerMetrics>,
}

impl ConsumeWorker {
impl<Tx: TransactionWithMeta> ConsumeWorker<Tx> {
pub fn new(
id: u32,
consume_receiver: Receiver<ConsumeWork>,
consume_receiver: Receiver<ConsumeWork<Tx>>,
consumer: Consumer,
consumed_sender: Sender<FinishedConsumeWork>,
consumed_sender: Sender<FinishedConsumeWork<Tx>>,
leader_bank_notifier: Arc<LeaderBankNotifier>,
) -> Self {
Self {
Expand All @@ -58,14 +59,14 @@ impl ConsumeWorker {
self.metrics.clone()
}

pub fn run(self) -> Result<(), ConsumeWorkerError> {
pub fn run(self) -> Result<(), ConsumeWorkerError<Tx>> {
loop {
let work = self.consume_receiver.recv()?;
self.consume_loop(work)?;
}
}

fn consume_loop(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> {
fn consume_loop(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
let (maybe_consume_bank, get_bank_us) = measure_us!(self.get_consume_bank());
let Some(mut bank) = maybe_consume_bank else {
self.metrics
Expand Down Expand Up @@ -103,7 +104,11 @@ impl ConsumeWorker {
}

/// Consume a single batch.
fn consume(&self, bank: &Arc<Bank>, work: ConsumeWork) -> Result<(), ConsumeWorkerError> {
fn consume(
&self,
bank: &Arc<Bank>,
work: ConsumeWork<Tx>,
) -> Result<(), ConsumeWorkerError<Tx>> {
let output = self.consumer.process_and_record_aged_transactions(
bank,
&work.transactions,
Expand All @@ -130,15 +135,15 @@ impl ConsumeWorker {
}

/// Retry current batch and all outstanding batches.
fn retry_drain(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> {
fn retry_drain(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
for work in try_drain_iter(work, &self.consume_receiver) {
self.retry(work)?;
}
Ok(())
}

/// Send transactions back to scheduler as retryable.
fn retry(&self, work: ConsumeWork) -> Result<(), ConsumeWorkerError> {
fn retry(&self, work: ConsumeWork<Tx>) -> Result<(), ConsumeWorkerError<Tx>> {
let retryable_indexes: Vec<_> = (0..work.transactions.len()).collect();
let num_retryable = retryable_indexes.len();
self.metrics
Expand Down Expand Up @@ -774,7 +779,9 @@ mod tests {
signature::Keypair,
signer::Signer,
system_instruction, system_transaction,
transaction::{MessageHash, TransactionError, VersionedTransaction},
transaction::{
MessageHash, SanitizedTransaction, TransactionError, VersionedTransaction,
},
},
solana_svm_transaction::svm_message::SVMMessage,
std::{
Expand All @@ -798,11 +805,14 @@ mod tests {
_poh_simulator: JoinHandle<()>,
_replay_vote_receiver: ReplayVoteReceiver,

consume_sender: Sender<ConsumeWork>,
consumed_receiver: Receiver<FinishedConsumeWork>,
consume_sender: Sender<ConsumeWork<RuntimeTransaction<SanitizedTransaction>>>,
consumed_receiver: Receiver<FinishedConsumeWork<RuntimeTransaction<SanitizedTransaction>>>,
}

fn setup_test_frame() -> (TestFrame, ConsumeWorker) {
fn setup_test_frame() -> (
TestFrame,
ConsumeWorker<RuntimeTransaction<SanitizedTransaction>>,
) {
let GenesisConfigInfo {
genesis_config,
mint_keypair,
Expand Down
7 changes: 3 additions & 4 deletions core/src/banking_stage/read_write_account_set.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use {
ahash::AHashSet,
solana_sdk::{message::SanitizedMessage, pubkey::Pubkey},
ahash::AHashSet, solana_sdk::pubkey::Pubkey, solana_svm_transaction::svm_message::SVMMessage,
};

/// Wrapper struct to accumulate locks for a batch of transactions.
Expand All @@ -14,7 +13,7 @@ pub struct ReadWriteAccountSet {

impl ReadWriteAccountSet {
/// Returns true if all account locks were available and false otherwise.
pub fn check_locks(&self, message: &SanitizedMessage) -> bool {
pub fn check_locks(&self, message: &impl SVMMessage) -> bool {
message
.account_keys()
.iter()
Expand All @@ -30,7 +29,7 @@ impl ReadWriteAccountSet {

/// Add all account locks.
/// Returns true if all account locks were available and false otherwise.
pub fn take_locks(&mut self, message: &SanitizedMessage) -> bool {
pub fn take_locks(&mut self, message: &impl SVMMessage) -> bool {
message
.account_keys()
.iter()
Expand Down
14 changes: 5 additions & 9 deletions core/src/banking_stage/scheduler_messages.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use {
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_sdk::{
clock::{Epoch, Slot},
transaction::SanitizedTransaction,
},
solana_sdk::clock::{Epoch, Slot},
std::fmt::Display,
};

Expand Down Expand Up @@ -54,16 +50,16 @@ impl MaxAge {

/// Message: [Scheduler -> Worker]
/// Transactions to be consumed (i.e. executed, recorded, and committed)
pub struct ConsumeWork {
pub struct ConsumeWork<Tx> {
pub batch_id: TransactionBatchId,
pub ids: Vec<TransactionId>,
pub transactions: Vec<RuntimeTransaction<SanitizedTransaction>>,
pub transactions: Vec<Tx>,
pub max_ages: Vec<MaxAge>,
}

/// Message: [Worker -> Scheduler]
/// Processed transactions.
pub struct FinishedConsumeWork {
pub work: ConsumeWork,
pub struct FinishedConsumeWork<Tx> {
pub work: ConsumeWork<Tx>,
pub retryable_indexes: Vec<usize>,
}
5 changes: 3 additions & 2 deletions core/src/banking_stage/transaction_scheduler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
mod batch_id_generator;
mod in_flight_tracker;
pub(crate) mod prio_graph_scheduler;
pub(crate) mod receive_and_buffer;
pub(crate) mod scheduler_controller;
pub(crate) mod scheduler_error;
mod scheduler_metrics;
mod thread_aware_account_locks;
mod transaction_id_generator;
pub(crate) mod transaction_id_generator;
mod transaction_priority_id;
mod transaction_state;
mod transaction_state_container;
pub(crate) mod transaction_state_container;
Loading

0 comments on commit 563d81f

Please sign in to comment.